This commit is contained in:
Markus Makela
2014-09-04 10:36:59 +03:00
80 changed files with 2981 additions and 1082 deletions

View File

@ -81,7 +81,7 @@ tags:
(cd hint; touch depend.mk; make tags)
depend:
@rm -f depend.mk
@$(DEL) depend.mk
cc -M $(CFLAGS) $(SRCS) > depend.mk
(cd hint; touch depend.mk; make depend)

View File

@ -17,6 +17,9 @@
*/
/**
* @file qlafilter.c - Quary Log All Filter
* @verbatim
*
* QLA Filter - Query Log All. A primitive query logging filter, simply
* used to verify the filter mechanism for downstream filters. All queries
* that are passed through the filter will be written to file.
@ -33,6 +36,7 @@
* 11/06/2014 Mark Riddoch Addition of source and match parameters
* 19/06/2014 Mark Riddoch Addition of user parameter
*
* @endverbatim
*/
#include <stdio.h>
#include <fcntl.h>
@ -154,6 +158,7 @@ GetModuleObject()
* within MaxScale.
*
* @param options The options for this filter
* @param params The array of name/value pair parameters for the filter
*
* @return The instance data for this new instance
*/

View File

@ -27,7 +27,8 @@
extern int lm_enabled_logfiles_bitmask;
/**
* regexfilter.c - a very simple regular expression rewrite filter.
* @file regexfilter.c - a very simple regular expression rewrite filter.
* @verbatim
*
* A simple regular expression query rewrite filter.
* Two parameters should be defined in the filter configuration
@ -39,6 +40,7 @@ extern int lm_enabled_logfiles_bitmask;
*
* Date Who Description
* 19/06/2014 Mark Riddoch Addition of source and user parameters
* @endverbatim
*/
MODULE_INFO info = {
@ -132,6 +134,7 @@ GetModuleObject()
* within MaxScale.
*
* @param options The options for this filter
* @param params The array of name/value pair parameters for the filter
*
* @return The instance data for this new instance
*/

View File

@ -18,6 +18,7 @@
/**
* @file tee.c A filter that splits the processing pipeline in two
* @verbatim
*
* Conditionally duplicate requests and send the duplicates to another service
* within MaxScale.
@ -41,6 +42,7 @@
* 20/06/2014 Mark Riddoch Initial implementation
* 24/06/2014 Mark Riddoch Addition of support for multi-packet queries
*
* @endverbatim
*/
#include <stdio.h>
#include <fcntl.h>
@ -162,6 +164,7 @@ GetModuleObject()
* within MaxScale.
*
* @param options The options for this filter
* @param params The array of name/value pair parameters for the filter
*
* @return The instance data for this new instance
*/

View File

@ -396,8 +396,9 @@ void print_help()
{
printf("\nFilter Test Harness\n\n"
"List of commands:\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n"
"%-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n"
"List of commands:\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n "
"%-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n "
"%-32s%s\n %-32s%s\n"
,"help","Prints this help message."
,"run","Feeds the contents of the buffer to the filter chain."
,"add <filter name>","Loads a filter and appeds it to the end of the chain."
@ -407,6 +408,8 @@ void print_help()
,"config <file name>","Loads filter configurations from a file."
,"in <file name>","Source file for the SQL statements."
,"out <file name>","Destination file for the SQL statements. Defaults to stdout if no parameters were passed."
,"threads <number>","Sets the amount of threads to use"
,"sessions <number>","How many sessions to create for each filter. This clears all loaded filters."
,"quiet","Print only error messages."
,"verbose","Print everything."
,"exit","Exit the program"
@ -490,9 +493,14 @@ FILTER_PARAMETER** read_params(int* paramc)
int routeQuery(void* ins, void* session, GWBUF* queue)
{
int buffsz = (int)(queue->end - (queue->start + 5));
unsigned int buffsz = 0;
unsigned char* ptr = (void*)queue->start;
char *qstr;
buffsz += *ptr++;
buffsz += *ptr++ << 8;
buffsz += *ptr++ << 16;
if(queue->hint){
buffsz += 40;
if(queue->hint->data){
@ -506,7 +514,7 @@ int routeQuery(void* ins, void* session, GWBUF* queue)
qstr = calloc(buffsz,sizeof(char));
if(qstr){
memcpy(qstr,queue->start + 5,(int)(queue->end - 1 - (queue->start + 5)));
memcpy(qstr,queue->start + 5,buffsz - 1);
if(queue->hint){
char *ptr = qstr + (int)(queue->end - 1 - (queue->start + 5));
@ -621,9 +629,9 @@ void manual_query()
gwbuf_set_type(instance.buffer[0],GWBUF_TYPE_MYSQL);
memcpy(instance.buffer[0]->sbuf->data + 5,query,qlen);
instance.buffer[0]->sbuf->data[0] = (qlen>>0&1)|(qlen>>1&1) << 1;
instance.buffer[0]->sbuf->data[1] = (qlen>>2&1)|(qlen>>3&1) << 1;
instance.buffer[0]->sbuf->data[2] = (qlen>>4&1)|(qlen>>5&1) << 1;
instance.buffer[0]->sbuf->data[0] = (qlen);
instance.buffer[0]->sbuf->data[1] = (qlen << 8);
instance.buffer[0]->sbuf->data[2] = (qlen << 16);
instance.buffer[0]->sbuf->data[3] = 0x00;
instance.buffer[0]->sbuf->data[4] = 0x03;
@ -706,9 +714,9 @@ int load_query()
memcpy(tmpbff[i]->sbuf->data + 5,query_list[i],strnlen(query_list[i],buff_sz));
qlen = strnlen(query_list[i],buff_sz);
tmpbff[i]->sbuf->data[0] = (qlen>>0&1)|(qlen>>1&1) << 1;
tmpbff[i]->sbuf->data[1] = (qlen>>2&1)|(qlen>>3&1) << 1;
tmpbff[i]->sbuf->data[2] = (qlen>>4&1)|(qlen>>5&1) << 1;
tmpbff[i]->sbuf->data[0] = qlen;
tmpbff[i]->sbuf->data[1] = (qlen << 8);
tmpbff[i]->sbuf->data[2] = (qlen << 16);
tmpbff[i]->sbuf->data[3] = 0x00;
tmpbff[i]->sbuf->data[4] = 0x03;

View File

@ -21,13 +21,15 @@
#include <modutil.h>
/**
* testfilter.c - a very simple test filter.
* @file testfilter.c - a very simple test filter.
* @verbatim
*
* This filter is a very simple example used to test the filter API,
* it merely counts the number of statements that flow through the
* filter pipeline.
*
* Reporting is done via the diagnostics print routine.
* @endverbatim
*/
MODULE_INFO info = {
@ -114,6 +116,7 @@ GetModuleObject()
* within MaxScale.
*
* @param options The options for this filter
* @param params The array of name/value pair parameters for the filter
*
* @return The instance data for this new instance
*/

View File

@ -17,6 +17,9 @@
*/
/**
* @file topfilter.c - Top N Longest Running Queries
* @verbatim
*
* TOPN Filter - Query Log All. A primitive query logging filter, simply
* used to verify the filter mechanism for downstream filters. All queries
* that are passed through the filter will be written to file.
@ -30,6 +33,8 @@
*
* Date Who Description
* 18/06/2014 Mark Riddoch Addition of source and user filters
*
* @endverbatim
*/
#include <stdio.h>
#include <fcntl.h>
@ -172,6 +177,7 @@ GetModuleObject()
* within MaxScale.
*
* @param options The options for this filter
* @param params The array of name/value pair parameters for the filter
*
* @return The instance data for this new instance
*/

View File

@ -48,20 +48,18 @@
#define HTTPD_FIELD_MAXLEN 8192
#define HTTPD_REQUESTLINE_MAXLEN 8192
typedef enum {
METHOD_UNKNOWN = 0,
METHOD_POST,
METHOD_PUT,
METHOD_GET,
METHOD_HEAD
} HTTP_METHOD;
/**
* HTTPD session specific data
*
*/
typedef struct httpd_session {
HTTP_METHOD method;
GWBUF *saved;
int request_len;
char *url;
char user[HTTPD_USER_MAXLEN]; /*< username for authentication*/
char *cookies; /*< all input cookies */
char hostname[HTTPD_HOSTNAME_MAXLEN]; /*< The hostname */
char useragent[HTTPD_USERAGENT_MAXLEN]; /*< The useragent */
char method[HTTPD_METHOD_MAXLEN]; /*< The HTTPD Method */
char *url; /*< the URL in the request */
char *path_info; /*< the Pathinfo, starts with /, is the extra path segments after the document name */
char *query_string; /*< the Query string, starts with ?, after path_info and document name */
int headers_received; /*< All the headers has been received, if 1 */
} HTTPD_session;

View File

@ -102,6 +102,12 @@ typedef enum {
MYSQL_IDLE
} mysql_auth_state_t;
typedef enum {
MYSQL_PROTOCOL_ALLOC,
MYSQL_PROTOCOL_ACTIVE,
MYSQL_PROTOCOL_DONE
} mysql_protocol_state_t;
/*
* MySQL session specific data
@ -270,6 +276,7 @@ typedef struct {
server_command_t protocol_command; /*< session command list */
server_command_t* protocol_cmd_history; /*< session command history */
mysql_auth_state_t protocol_auth_state; /*< Authentication status */
mysql_protocol_state_t protocol_state; /*< Protocol struct status */
uint8_t scramble[MYSQL_SCRAMBLE_LEN]; /*< server scramble,
* created or received */
uint32_t server_capabilities; /*< server capabilities,

View File

@ -92,7 +92,8 @@ typedef enum rses_property_type_t {
RSES_PROP_TYPE_UNDEFINED=-1,
RSES_PROP_TYPE_SESCMD=0,
RSES_PROP_TYPE_FIRST = RSES_PROP_TYPE_SESCMD,
RSES_PROP_TYPE_LAST=RSES_PROP_TYPE_SESCMD,
RSES_PROP_TYPE_TMPTABLES,
RSES_PROP_TYPE_LAST=RSES_PROP_TYPE_TMPTABLES,
RSES_PROP_TYPE_COUNT=RSES_PROP_TYPE_LAST+1
} rses_property_type_t;
@ -157,7 +158,7 @@ struct rses_property_st {
rses_property_type_t rses_prop_type;
union rses_prop_data {
mysql_sescmd_t sescmd;
void* placeholder; /*< to be removed due new type */
HASHTABLE* temp_tables;
} rses_prop_data;
rses_property_t* rses_prop_next; /*< next property of same type */
#if defined(SS_DEBUG)

View File

@ -20,6 +20,8 @@
# 28/07/14 Massimiliano Pinto new monitor ndbcluster added
include ../../../build_gateway.inc
include ../../../makefile.inc
LOGPATH := $(ROOT_PATH)/log_manager
UTILSPATH := $(ROOT_PATH)/utils
@ -61,13 +63,13 @@ libndbclustermon.so: $(NDBCLUSTEROBJ)
$(CC) $(CFLAGS) $< -o $@
clean:
rm -f $(OBJ) $(MODULES)
$(DEL) $(OBJ) $(MODULES)
tags:
ctags $(SRCS) $(HDRS)
depend:
@rm -f depend.mk
@$(DEL) depend.mk
cc -M $(CFLAGS) $(SRCS) > depend.mk
install: $(MODULES)

View File

@ -76,7 +76,7 @@ libmaxscaled.so: $(MAXSCALEDOBJ)
$(CC) $(CFLAGS) $< -o $@
clean:
rm -f $(OBJ) $(MODULES)
$(DEL) $(OBJ) $(MODULES)
tags:
ctags $(SRCS) $(HDRS)
@ -85,7 +85,7 @@ install: $(MODULES)
install -D $(MODULES) $(DEST)/modules
depend:
rm -f depend.mk
@$(DEL) depend.mk
cc -M $(CFLAGS) $(SRCS) > depend.mk
include depend.mk

View File

@ -33,8 +33,6 @@
* Date Who Description
* 08/07/2013 Massimiliano Pinto Initial version
* 09/07/2013 Massimiliano Pinto Added /show?dcb|session for all dcbs|sessions
* 11/07/2014 Mark Riddoch Recoded as more generic protocol module
* removing hardcoded example
*
* @endverbatim
*/
@ -42,10 +40,6 @@
#include <httpd.h>
#include <gw.h>
#include <modinfo.h>
#include <skygw_utils.h>
#include <log_manager.h>
extern int lm_enabled_logfiles_bitmask;
MODULE_INFO info = {
MODULE_API_PROTOCOL,
@ -54,6 +48,7 @@ MODULE_INFO info = {
"An experimental HTTPD implementation for use in admnistration"
};
#define ISspace(x) isspace((int)(x))
#define HTTP_SERVER_STRING "Gateway(c) v.1.0.0"
static char *version_str = "V1.0.1";
@ -65,8 +60,8 @@ static int httpd_hangup(DCB *dcb);
static int httpd_accept(DCB *dcb);
static int httpd_close(DCB *dcb);
static int httpd_listen(DCB *dcb, char *config);
static char *httpd_nextline(GWBUF *buf, char *ptr);
static void httpd_process_header(GWBUF *buf, char *sol, HTTPD_session *client_data);
static int httpd_get_line(int sock, char *buf, int size);
static void httpd_send_headers(DCB *dcb, int final);
/**
* The "module object" for the httpd protocol module.
@ -126,99 +121,132 @@ GetModuleObject()
* @return
*/
static int
httpd_read_event(DCB *dcb)
httpd_read_event(DCB* dcb)
{
SESSION *session = dcb->session;
GWBUF *buf = NULL;
char *ptr, *sol;
HTTPD_session *client_data = NULL;
int n;
//SESSION *session = dcb->session;
//ROUTER_OBJECT *router = session->service->router;
//ROUTER *router_instance = session->service->router_instance;
//void *rsession = session->router_session;
// Read all the available data
if ((n = dcb_read(dcb, &buf)) != -1)
{
client_data = dcb->data;
int numchars = 1;
char buf[HTTPD_REQUESTLINE_MAXLEN-1] = "";
char *query_string = NULL;
char method[HTTPD_METHOD_MAXLEN-1] = "";
char url[HTTPD_SMALL_BUFFER] = "";
int cgi = 0;
size_t i, j;
int headers_read = 0;
HTTPD_session *client_data = NULL;
if (client_data->saved)
{
buf = gwbuf_append(client_data->saved, buf);
client_data->saved = NULL;
}
buf = gwbuf_make_contiguous(buf);
ptr = GWBUF_DATA(buf);
if (strncasecmp(ptr, "POST", 4))
{
client_data->method = METHOD_POST;
gwbuf_add_property(buf, "Method", "POST");
ptr = ptr + 4;
}
else if (strncasecmp(ptr, "PUT", 3))
{
client_data->method = METHOD_PUT;
gwbuf_add_property(buf, "Method", "PUT");
ptr = ptr + 3;
}
else if (strncasecmp(ptr, "GET", 3))
{
client_data->method = METHOD_GET;
gwbuf_add_property(buf, "Method", "GET");
ptr = ptr + 3;
}
else if (strncasecmp(ptr, "HEAD", 4))
{
client_data->method = METHOD_HEAD;
gwbuf_add_property(buf, "Method", "HEAD");
ptr = ptr + 4;
}
while (ptr < (char *)(buf->end) && isspace(*ptr))
ptr++;
sol = ptr;
while (ptr < (char *)(buf->end) && isspace(*ptr) == 0)
ptr++;
client_data->url = strndup(sol, ptr - sol);
gwbuf_add_property(buf, "URL", client_data->url);
while ((sol = httpd_nextline(buf, ptr)) != NULL &&
*sol != '\n' && *sol != '\r')
{
httpd_process_header(buf, sol, client_data);
ptr = sol;
}
/*
* We have read all the headers, or run out of data to
* examine.
*/
if (sol == NULL)
{
client_data->saved = buf;
return 0;
}
else
{
if (((char *)(buf->end) - sol)
< client_data->request_len)
{
client_data->saved = buf;
}
else
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"HTTPD: request %s.\n", client_data->url)));
SESSION_ROUTE_QUERY(session, buf);
if (client_data->url)
{
free(client_data->url);
client_data->url = NULL;
}
}
}
client_data = dcb->data;
/**
* get the request line
* METHOD URL HTTP_VER\r\n
*/
numchars = httpd_get_line(dcb->fd, buf, sizeof(buf));
i = 0; j = 0;
while (!ISspace(buf[j]) && (i < sizeof(method) - 1)) {
method[i] = buf[j];
i++; j++;
}
method[i] = '\0';
strcpy(client_data->method, method);
/* check allowed http methods */
if (strcasecmp(method, "GET") && strcasecmp(method, "POST")) {
//httpd_unimplemented(dcb->fd);
return 0;
}
if (strcasecmp(method, "POST") == 0)
cgi = 1;
i = 0;
while (ISspace(buf[j]) && (j < sizeof(buf))) {
j++;
}
while (!ISspace(buf[j]) && (i < sizeof(url) - 1) && (j < sizeof(buf))) {
url[i] = buf[j];
i++; j++;
}
url[i] = '\0';
/**
* Get the query string if availble
*/
if (strcasecmp(method, "GET") == 0) {
query_string = url;
while ((*query_string != '?') && (*query_string != '\0'))
query_string++;
if (*query_string == '?') {
cgi = 1;
*query_string = '\0';
query_string++;
}
}
/**
* Get the request headers
*/
while ((numchars > 0) && strcmp("\n", buf)) {
char *value = NULL;
char *end = NULL;
numchars = httpd_get_line(dcb->fd, buf, sizeof(buf));
if ( (value = strchr(buf, ':'))) {
*value = '\0';
value++;
end = &value[strlen(value) -1];
*end = '\0';
if (strncasecmp(buf, "Hostname", 6) == 0) {
strcpy(client_data->hostname, value);
}
if (strncasecmp(buf, "useragent", 9) == 0) {
strcpy(client_data->useragent, value);
}
}
}
if (numchars) {
headers_read = 1;
memcpy(&client_data->headers_received, &headers_read, sizeof(int));
}
/**
* Now begins the server reply
*/
/* send all the basic headers and close with \r\n */
httpd_send_headers(dcb, 1);
/**
* ToDO: launch proper content handling based on the requested URI, later REST interface
*
*/
dcb_printf(dcb, "Welcome to HTTPD Gateway (c) %s\n\n", version_str);
if (strcmp(url, "/show") == 0) {
if (strlen(query_string)) {
if (strcmp(query_string, "dcb") == 0)
dprintAllDCBs(dcb);
if (strcmp(query_string, "session") == 0)
dprintAllSessions(dcb);
}
}
/* force the client connecton close */
dcb_close(dcb);
return 0;
}
@ -259,18 +287,6 @@ httpd_write(DCB *dcb, GWBUF *queue)
static int
httpd_error(DCB *dcb)
{
HTTPD_session *client_data = NULL;
if (dcb->data)
{
client_data = dcb->data;
if (client_data->url)
{
free(client_data->url);
client_data->url = NULL;
}
free(dcb->data);
dcb->data = NULL;
}
dcb_close(dcb);
return 0;
}
@ -302,7 +318,7 @@ int n_connect = 0;
{
int so = -1;
struct sockaddr_in addr;
socklen_t addrlen = 0;
socklen_t addrlen;
DCB *client = NULL;
HTTPD_session *client_data = NULL;
@ -317,11 +333,10 @@ int n_connect = 0;
memcpy(&client->func, &MyObject, sizeof(GWPROTOCOL));
/* we don't need the session */
client->session = session_alloc(dcb->session->service, client);
client->session = NULL;
/* create the session data for HTTPD */
client_data = (HTTPD_session *)calloc(1, sizeof(HTTPD_session));
memset(client_data, 0, sizeof(HTTPD_session));
client->data = client_data;
if (poll_add_dcb(client) == -1)
@ -410,84 +425,51 @@ int rc;
}
/**
* Return the start of the next line int the buffer.
*
* @param buf The GWBUF chain
* @param ptr Start point within the buffer
*
* @return the start of the next line or NULL if there are no more lines
* HTTPD get line from client
*/
static char *
httpd_nextline(GWBUF *buf, char *ptr)
{
while (ptr < (char *)(buf->end) && *ptr != '\n' && *ptr != '\r')
ptr++;
if (ptr >= (char *)(buf->end))
return NULL;
static int httpd_get_line(int sock, char *buf, int size) {
int i = 0;
char c = '\0';
int n;
/* Skip prcisely one CR/LF */
if (*ptr == '\r')
ptr++;
if (*ptr == '\n')
ptr++;
return ptr;
}
/**
* The headers to extract from the HTTP request and add as properties to the
* GWBUF structure.
*/
static char *headers[] = {
"Content-Type",
"User-Agent",
"From",
"Date",
NULL
};
/**
* Process a single header line
*
* @param buf The GWBUF that contains the request
* @param sol The current start of line
* @param client_data The client data structure for this request
*/
static void
httpd_process_header(GWBUF *buf, char *sol, HTTPD_session *client_data)
{
char *ptr = sol;
char cbuf[300];
int len, i;
/* Find the end of the line */
while (ptr < (char *)(buf->end) && *ptr != '\n' && *ptr != '\r')
ptr++;
if (strncmp(sol, "Content-Length:", strlen("Content-Length:")) == 0)
{
char *p1 = sol + strlen("Content-Length:");
while (isspace(*p1))
p1++;
len = ptr - p1;
strncpy(cbuf, p1, len);
cbuf[len] = 0;
client_data->request_len = atoi(cbuf);
gwbuf_add_property(buf, "Content-Length", cbuf);
}
else
{
for (i = 0; headers[i]; i++)
{
if (strncmp(sol, headers[i], strlen(headers[i])) == 0)
{
char *p1 = sol + strlen(headers[i]) + 1;
while (isspace(*p1))
p1++;
len = ptr - p1;
strncpy(cbuf, p1, len);
cbuf[len] = 0;
gwbuf_add_property(buf, headers[i], cbuf);
while ((i < size - 1) && (c != '\n')) {
n = recv(sock, &c, 1, 0);
/* DEBUG printf("%02X\n", c); */
if (n > 0) {
if (c == '\r') {
n = recv(sock, &c, 1, MSG_PEEK);
/* DEBUG printf("%02X\n", c); */
if ((n > 0) && (c == '\n'))
recv(sock, &c, 1, 0);
else
c = '\n';
}
}
buf[i] = c;
i++;
} else
c = '\n';
}
buf[i] = '\0';
return i;
}
/**
* HTTPD send basic headers with 200 OK
*/
static void httpd_send_headers(DCB *dcb, int final)
{
char date[64] = "";
const char *fmt = "%a, %d %b %Y %H:%M:%S GMT";
time_t httpd_current_time = time(NULL);
strftime(date, sizeof(date), fmt, localtime(&httpd_current_time));
dcb_printf(dcb, "HTTP/1.1 200 OK\r\nDate: %s\r\nServer: %s\r\nConnection: close\r\nContent-Type: text/plain\r\n", date, HTTP_SERVER_STRING);
/* close the headers */
if (final) {
dcb_printf(dcb, "\r\n");
}
}

View File

@ -761,12 +761,13 @@ return_rc:
*/
static int gw_error_backend_event(DCB *dcb)
{
SESSION* session;
void* rsession;
ROUTER_OBJECT* router;
ROUTER* router_instance;
GWBUF* errbuf;
bool succp;
SESSION* session;
void* rsession;
ROUTER_OBJECT* router;
ROUTER* router_instance;
GWBUF* errbuf;
bool succp;
session_state_t ses_state;
CHK_DCB(dcb);
session = dcb->session;
@ -775,11 +776,6 @@ static int gw_error_backend_event(DCB *dcb)
router = session->service->router;
router_instance = session->service->router_instance;
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend error event handling.")));
#endif
/**
* Avoid running redundant error handling procedure.
* dcb_close is already called for the DCB. Thus, either connection is
@ -795,6 +791,34 @@ static int gw_error_backend_event(DCB *dcb)
0,
"Lost connection to backend server.");
spinlock_acquire(&session->ses_lock);
ses_state = session->state;
spinlock_release(&session->ses_lock);
/**
* Session might be initialized when DCB already is in the poll set.
* Thus hangup can occur in the middle of session initialization.
* Only complete and successfully initialized sessions allow for
* calling error handler.
*/
while (ses_state == SESSION_STATE_READY)
{
spinlock_acquire(&session->ses_lock);
ses_state = session->state;
spinlock_release(&session->ses_lock);
}
if (ses_state != SESSION_STATE_ROUTER_READY)
{
gwbuf_free(errbuf);
goto retblock;
}
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend error event handling.")));
#endif
router->handleError(router_instance,
rsession,
errbuf,
@ -810,6 +834,7 @@ static int gw_error_backend_event(DCB *dcb)
}
dcb_close(dcb);
retblock:
return 1;
}
@ -923,12 +948,13 @@ return_fd:
static int
gw_backend_hangup(DCB *dcb)
{
SESSION* session;
void* rsession;
ROUTER_OBJECT* router;
ROUTER* router_instance;
bool succp;
GWBUF* errbuf;
SESSION* session;
void* rsession;
ROUTER_OBJECT* router;
ROUTER* router_instance;
bool succp;
GWBUF* errbuf;
session_state_t ses_state;
CHK_DCB(dcb);
session = dcb->session;
@ -936,20 +962,41 @@ gw_backend_hangup(DCB *dcb)
rsession = session->router_session;
router = session->service->router;
router_instance = session->service->router_instance;
router_instance = session->service->router_instance;
errbuf = mysql_create_custom_error(
1,
0,
"Lost connection to backend server.");
spinlock_acquire(&session->ses_lock);
ses_state = session->state;
spinlock_release(&session->ses_lock);
/**
* Session might be initialized when DCB already is in the poll set.
* Thus hangup can occur in the middle of session initialization.
* Only complete and successfully initialized sessions allow for
* calling error handler.
*/
while (ses_state == SESSION_STATE_READY)
{
spinlock_acquire(&session->ses_lock);
ses_state = session->state;
spinlock_release(&session->ses_lock);
}
if (ses_state != SESSION_STATE_ROUTER_READY)
{
gwbuf_free(errbuf);
goto retblock;
}
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend hangup error handling.")));
#endif
errbuf = mysql_create_custom_error(
1,
0,
"Lost connection to backend server.");
router->handleError(router_instance,
rsession,
errbuf,
@ -958,7 +1005,8 @@ gw_backend_hangup(DCB *dcb)
&succp);
/** There are not required backends available, close session. */
if (!succp) {
if (!succp)
{
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
@ -971,7 +1019,8 @@ gw_backend_hangup(DCB *dcb)
}
dcb_close(dcb);
return 1;
retblock:
return 1;
}
/**

View File

@ -65,7 +65,7 @@ static int gw_client_hangup_event(DCB *dcb);
int mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message);
int MySQLSendHandshake(DCB* dcb);
static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue);
static int route_by_statement(SESSION *, GWBUF *);
static int route_by_statement(SESSION *, GWBUF **);
/*
* The "module object" for the mysqld client protocol module.
@ -534,7 +534,7 @@ int gw_read_client_event(
if (nbytes_read == 0)
{
goto return_rc;
}
}
/**
* if read queue existed appent read to it.
* if length of read buffer is less than 3 or less than mysql packet
@ -783,7 +783,7 @@ int gw_read_client_event(
* Feed each statement completely and separately
* to router.
*/
rc = route_by_statement(session, read_buffer);
rc = route_by_statement(session, &read_buffer);
if (read_buffer != NULL)
{
@ -1300,12 +1300,19 @@ static int gw_error_client_event(
STRDCBSTATE(dcb->state),
(session != NULL ? session : NULL))));
if (session != NULL && session->state == SESSION_STATE_STOPPING)
{
goto retblock;
}
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Client error event handling.")));
#endif
dcb_close(dcb);
retblock:
return 1;
}
@ -1380,12 +1387,19 @@ gw_client_hangup_event(DCB *dcb)
{
CHK_SESSION(session);
}
if (session != NULL && session->state == SESSION_STATE_STOPPING)
{
goto retblock;
}
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Client hangup error handling.")));
"Client hangup error handling.")));
#endif
dcb_close(dcb);
retblock:
return 1;
}
@ -1399,14 +1413,16 @@ gw_client_hangup_event(DCB *dcb)
* Return 1 in success. If the last packet is incomplete return success but
* leave incomplete packet to readbuf.
*/
static int route_by_statement(SESSION *session, GWBUF *readbuf)
static int route_by_statement(
SESSION* session,
GWBUF** p_readbuf)
{
int rc = -1;
GWBUF* packetbuf;
#if defined(SS_DEBUG)
GWBUF* tmpbuf;
tmpbuf = readbuf;
tmpbuf = *p_readbuf;
while (tmpbuf != NULL)
{
ss_dassert(GWBUF_IS_TYPE_MYSQL(tmpbuf));
@ -1415,15 +1431,14 @@ static int route_by_statement(SESSION *session, GWBUF *readbuf)
#endif
do
{
ss_dassert(GWBUF_IS_TYPE_MYSQL(readbuf));
ss_dassert(GWBUF_IS_TYPE_MYSQL((*p_readbuf)));
packetbuf = gw_MySQL_get_next_packet(&readbuf);
ss_dassert(GWBUF_IS_TYPE_MYSQL(packetbuf));
packetbuf = gw_MySQL_get_next_packet(p_readbuf);
if (packetbuf != NULL)
{
CHK_GWBUF(packetbuf);
ss_dassert(GWBUF_IS_TYPE_MYSQL(packetbuf));
/**
* This means that buffer includes exactly one MySQL
* statement.
@ -1446,7 +1461,7 @@ static int route_by_statement(SESSION *session, GWBUF *readbuf)
goto return_rc;
}
}
while (readbuf != NULL);
while (*p_readbuf != NULL);
return_rc:
return rc;

View File

@ -79,6 +79,7 @@ MySQLProtocol* mysql_protocol_init(
strerror(eno))));
goto return_p;
}
p->protocol_state = MYSQL_PROTOCOL_ALLOC;
p->protocol_auth_state = MYSQL_ALLOC;
p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED;
p->protocol_command.scom_nresponse_packets = 0;
@ -90,6 +91,7 @@ MySQLProtocol* mysql_protocol_init(
/*< Assign fd with protocol */
p->fd = fd;
p->owner_dcb = dcb;
p->protocol_state = MYSQL_PROTOCOL_ACTIVE;
CHK_PROTOCOL(p);
return_p:
return p;
@ -107,15 +109,30 @@ return_p:
void mysql_protocol_done (
DCB* dcb)
{
server_command_t* scmd = ((MySQLProtocol *)dcb->protocol)->protocol_cmd_history;
MySQLProtocol* p;
server_command_t* scmd;
server_command_t* scmd2;
p = (MySQLProtocol *)dcb->protocol;
spinlock_acquire(&p->protocol_lock);
if (p->protocol_state != MYSQL_PROTOCOL_ACTIVE)
{
goto retblock;
}
scmd = p->protocol_cmd_history;
while (scmd != NULL)
{
scmd2 = scmd->scom_next;
free(scmd);
scmd = scmd2;
}
p->protocol_state = MYSQL_PROTOCOL_DONE;
retblock:
spinlock_release(&p->protocol_lock);
}
@ -886,7 +903,7 @@ int mysql_send_com_quit(
if (buf == NULL)
{
return 0;
}
}
nbytes = dcb->func.write(dcb, buf);
return nbytes;
@ -1497,6 +1514,9 @@ GWBUF* gw_MySQL_get_next_packet(
size_t packetlen;
size_t totalbuflen;
uint8_t* data;
size_t nbytes_copied = 0;
uint8_t* target;
readbuf = *p_readbuf;
if (readbuf == NULL)
@ -1523,42 +1543,27 @@ GWBUF* gw_MySQL_get_next_packet(
packetbuf = NULL;
goto return_packetbuf;
}
/** there is one complete packet in the buffer */
if (packetlen == buflen)
{
packetbuf = gwbuf_clone_portion(readbuf, 0, packetlen);
*p_readbuf = gwbuf_consume(readbuf, packetlen);
goto return_packetbuf;
}
packetbuf = gwbuf_alloc(packetlen);
target = GWBUF_DATA(packetbuf);
packetbuf->gwbuf_type = readbuf->gwbuf_type; /*< Copy the type too */
/**
* Packet spans multiple buffers.
* Allocate buffer for complete packet
* copy packet parts into it and consume copied bytes
*/
else if (packetlen > buflen)
* Copy first MySQL packet to packetbuf and leave posible other
* packets to read buffer.
*/
while (nbytes_copied < packetlen && totalbuflen > 0)
{
size_t nbytes_copied = 0;
uint8_t* target;
uint8_t* src = GWBUF_DATA((*p_readbuf));
size_t bytestocopy;
packetbuf = gwbuf_alloc(packetlen);
target = GWBUF_DATA(packetbuf);
while (nbytes_copied < packetlen)
{
uint8_t* src = GWBUF_DATA(readbuf);
size_t buflen = GWBUF_LENGTH(readbuf);
memcpy(target+nbytes_copied, src, buflen);
*p_readbuf = gwbuf_consume(readbuf, buflen);
nbytes_copied += buflen;
}
ss_dassert(nbytes_copied == packetlen);
}
else
{
packetbuf = gwbuf_clone_portion(readbuf, 0, packetlen);
*p_readbuf = gwbuf_consume(readbuf, packetlen);
bytestocopy = MIN(buflen,packetlen-nbytes_copied);
memcpy(target+nbytes_copied, src, bytestocopy);
*p_readbuf = gwbuf_consume((*p_readbuf), bytestocopy);
totalbuflen = gwbuf_length((*p_readbuf));
nbytes_copied += bytestocopy;
}
ss_dassert(buflen == 0 || nbytes_copied == packetlen);
return_packetbuf:
return packetbuf;
@ -1625,11 +1630,16 @@ void protocol_archive_srv_command(
MySQLProtocol* p)
{
server_command_t* s1;
server_command_t** s2;
server_command_t* h1;
int len = 0;
spinlock_acquire(&p->protocol_lock);
if (p->protocol_state != MYSQL_PROTOCOL_ACTIVE)
{
goto retblock;
}
s1 = &p->protocol_command;
LOGIF(LT, (skygw_log_write(
@ -1639,18 +1649,19 @@ void protocol_archive_srv_command(
p->owner_dcb->fd)));
/** Copy to history list */
s2 = &p->protocol_cmd_history;
if (*s2 != NULL)
{
while ((*s2)->scom_next != NULL)
{
*s2 = (*s2)->scom_next;
len += 1;
}
if ((h1 = p->protocol_cmd_history) == NULL)
{
p->protocol_cmd_history = server_command_copy(s1);
}
*s2 = server_command_copy(s1);
else
{
while (h1->scom_next != NULL)
{
h1 = h1->scom_next;
}
h1->scom_next = server_command_copy(s1);
}
/** Keep history limits, remove oldest */
if (len > MAX_CMD_HISTORY)
{
@ -1669,6 +1680,8 @@ void protocol_archive_srv_command(
p->protocol_command = *(s1->scom_next);
free(s1->scom_next);
}
retblock:
spinlock_release(&p->protocol_lock);
}
@ -1685,6 +1698,10 @@ void protocol_add_srv_command(
spinlock_acquire(&p->protocol_lock);
if (p->protocol_state != MYSQL_PROTOCOL_ACTIVE)
{
goto retblock;
}
/** this is the only server command in protocol */
if (p->protocol_command.scom_cmd == MYSQL_COM_UNDEFINED)
{
@ -1717,6 +1734,7 @@ void protocol_add_srv_command(
c = c->scom_next;
}
#endif
retblock:
spinlock_release(&p->protocol_lock);
}

View File

@ -44,13 +44,10 @@ DEBUGCLISRCS=debugcli.c debugcmd.c
DEBUGCLIOBJ=$(DEBUGCLISRCS:.c=.o)
CLISRCS=cli.c debugcmd.c
CLIOBJ=$(CLISRCS:.c=.o)
WEBSRCS=webserver.o
WEBOBJ=$(WEBSRCS:.c=.o)
SRCS=$(TESTSRCS) $(READCONSRCS) $(DEBUGCLISRCS) cli.c
OBJ=$(SRCS:.c=.o)
LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager
MODULES= libdebugcli.so libreadconnroute.so libtestroute.so libcli.so \
libwebserver.so
MODULES= libdebugcli.so libreadconnroute.so libtestroute.so libcli.so
all: $(MODULES)
@ -67,9 +64,6 @@ libdebugcli.so: $(DEBUGCLIOBJ)
libcli.so: $(CLIOBJ)
$(CC) $(LDFLAGS) $(CLIOBJ) $(LIBS) -o $@
libwebserver.so: $(WEBOBJ)
$(CC) $(LDFLAGS) $(WEBOBJ) $(LIBS) -o $@
libreadwritesplit.so:
# (cd readwritesplit; touch depend.mk ; make; cp $@ ..)
@ -77,7 +71,7 @@ libreadwritesplit.so:
$(CC) $(CFLAGS) $< -o $@
clean:
rm -f $(OBJ) $(MODULES)
$(DEL) $(OBJ) $(MODULES)
(cd readwritesplit; touch depend.mk; make clean)
tags:
@ -85,7 +79,7 @@ tags:
(cd readwritesplit; make tags)
depend:
@rm -f depend.mk
@$(DEL) depend.mk
cc -M $(CFLAGS) $(SRCS) > depend.mk
(cd readwritesplit; touch depend.mk ; make depend)

View File

@ -50,13 +50,13 @@ libreadwritesplit.so: $(OBJ)
$(CC) $(CFLAGS) $< -o $@
clean:
rm -f $(OBJ) $(MODULES)
$(DEL) $(OBJ) $(MODULES)
tags:
ctags $(SRCS) $(HDRS)
depend:
@rm -f depend.mk
@$(DEL) depend.mk
cc -M $(CFLAGS) $(SRCS) > depend.mk
install: $(MODULES)

View File

@ -31,6 +31,7 @@
#include <dcb.h>
#include <spinlock.h>
#include <modinfo.h>
#include <modutil.h>
#include <mysql_client_server_protocol.h>
MODULE_INFO info = {
@ -280,6 +281,47 @@ static bool have_enough_servers(
static SPINLOCK instlock;
static ROUTER_INSTANCE* instances;
static int hashkeyfun(void* key);
static int hashcmpfun (void *, void *);
static int hashkeyfun(
void* key)
{
if(key == NULL){
return 0;
}
unsigned int hash = 0,c = 0;
char* ptr = (char*)key;
while((c = *ptr++)){
hash = c + (hash << 6) + (hash << 16) - hash;
}
return *(int *)key;
}
static int hashcmpfun(
void* v1,
void* v2)
{
char* i1 = (char*) v1;
char* i2 = (char*) v2;
return strcmp(i1,i2);
}
static void* hstrdup(void* fval)
{
char* str = (char*)fval;
return strdup(str);
}
static void* hfree(void* fval)
{
free (fval);
return NULL;
}
/**
* Implementation of the mandatory version entry point
*
@ -711,7 +753,7 @@ static void* newSession(
backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property =
&client_rses->rses_properties[RSES_PROP_TYPE_SESCMD];
backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL;
}
}
max_nslaves = rses_get_max_slavecount(client_rses, router_nservers);
max_slave_rlag = rses_get_max_replication_lag(client_rses);
@ -956,76 +998,74 @@ static bool get_dcb(
/** get root master from available servers */
master_host = get_root_master(backend_ref, rses->rses_nbackends);
if (name != NULL) /*< Choose backend by name from a hint */
{
ss_dassert(btype != BE_MASTER); /*< Master dominates and no name should be passed with it */
for (i=0; i<rses->rses_nbackends; i++)
{
BACKEND* b = backend_ref[i].bref_backend;
/**
* To become chosen:
* backend must be in use, name must match,
* root master node must be found,
* backend's role must be either slave, relay
* server, or master.
*/
if (BREF_IS_IN_USE((&backend_ref[i])) &&
(strncasecmp(
name,
b->backend_server->unique_name,
PATH_MAX) == 0) &&
master_host != NULL &&
(SERVER_IS_SLAVE(b->backend_server) ||
SERVER_IS_RELAY_SERVER(b->backend_server) ||
SERVER_IS_MASTER(b->backend_server)))
{
*p_dcb = backend_ref[i].bref_dcb;
succp = true;
ss_dassert(backend_ref[i].bref_dcb->state != DCB_STATE_ZOMBIE);
break;
}
}
if (succp)
{
goto return_succp;
}
}
if (btype == BE_SLAVE)
{
if (name != NULL) /*< Choose backend by name (hint) */
{
for (i=0; i<rses->rses_nbackends; i++)
{
BACKEND* b = backend_ref[i].bref_backend;
/**
* To become chosen:
* backend must be in use, name must match,
* root master node must be found,
* backend's role must be either slave, relay
* server, or master.
*/
if (BREF_IS_IN_USE((&backend_ref[i])) &&
(strncasecmp(
name,
b->backend_server->unique_name,
MIN(strlen(b->backend_server->unique_name), PATH_MAX)) == 0) &&
master_host != NULL &&
#if 0
(max_rlag == MAX_RLAG_UNDEFINED ||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
b->backend_server->rlag <= max_rlag)) &&
#endif
(SERVER_IS_SLAVE(b->backend_server) ||
SERVER_IS_RELAY_SERVER(b->backend_server) ||
SERVER_IS_MASTER(b->backend_server)))
{
*p_dcb = backend_ref[i].bref_dcb;
succp = true;
ss_dassert(backend_ref[i].bref_dcb->state != DCB_STATE_ZOMBIE);
break;
}
}
}
if (!succp) /*< No hints or finding named backend failed */
{
for (i=0; i<rses->rses_nbackends; i++)
{
BACKEND* b = backend_ref[i].bref_backend;
/**
* To become chosen:
* backend must be in use,
* root master node must be found,
* backend is not allowed to be the master,
* backend's role can be either slave or relay
* server and it must have least connections
* at the moment.
*/
if (BREF_IS_IN_USE((&backend_ref[i])) &&
master_host != NULL &&
b->backend_server != master_host->backend_server &&
(max_rlag == MAX_RLAG_UNDEFINED ||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
b->backend_server->rlag <= max_rlag)) &&
(SERVER_IS_SLAVE(b->backend_server) ||
SERVER_IS_RELAY_SERVER(b->backend_server)) &&
(smallest_nconn == -1 ||
b->backend_conn_count < smallest_nconn))
{
*p_dcb = backend_ref[i].bref_dcb;
smallest_nconn = b->backend_conn_count;
succp = true;
ss_dassert(backend_ref[i].bref_dcb->state != DCB_STATE_ZOMBIE);
}
}
}
for (i=0; i<rses->rses_nbackends; i++)
{
BACKEND* b = backend_ref[i].bref_backend;
/**
* To become chosen:
* backend must be in use,
* root master node must be found,
* backend is not allowed to be the master,
* backend's role can be either slave or relay
* server and it must have least connections
* at the moment.
*/
if (BREF_IS_IN_USE((&backend_ref[i])) &&
master_host != NULL &&
b->backend_server != master_host->backend_server &&
(max_rlag == MAX_RLAG_UNDEFINED ||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
b->backend_server->rlag <= max_rlag)) &&
(SERVER_IS_SLAVE(b->backend_server) ||
SERVER_IS_RELAY_SERVER(b->backend_server)) &&
(smallest_nconn == -1 ||
b->backend_conn_count < smallest_nconn))
{
*p_dcb = backend_ref[i].bref_dcb;
smallest_nconn = b->backend_conn_count;
succp = true;
ss_dassert(backend_ref[i].bref_dcb->state != DCB_STATE_ZOMBIE);
}
}
if (!succp) /*< No valid slave was found, search master next */
{
@ -1065,7 +1105,7 @@ return_succp:
* Examine the query type, transaction state and routing hints. Find out the
* target for query routing.
*
* @param qtype Type of query
* @param qtype Type of query
* @param trx_active Is transacation active or not
* @param hint Pointer to list of hints attached to the query buffer
*
@ -1086,10 +1126,22 @@ static route_target_t get_route_target (
/** hints don't affect on routing */
target = TARGET_ALL;
}
else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) && !trx_active)
/**
* Read-only statements to slave or to master can be re-routed after
* the hints
*/
else if ((QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_READ)) &&
!trx_active)
{
target = TARGET_SLAVE;
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ))
{
target = TARGET_SLAVE;
}
else
{
target = TARGET_MASTER;
}
/** process routing hints */
while (hint != NULL)
{
@ -1103,8 +1155,15 @@ static route_target_t get_route_target (
}
else if (hint->type == HINT_ROUTE_TO_NAMED_SERVER)
{
target |= TARGET_NAMED_SERVER; /*< add */
}
/**
* Searching for a named server. If it can't be
* found, the oroginal target is chosen.
*/
target |= TARGET_NAMED_SERVER;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Hint: route to named server : ")));
}
else if (hint->type == HINT_ROUTE_TO_UPTODATE_SERVER)
{
/** not implemented */
@ -1140,6 +1199,7 @@ static route_target_t get_route_target (
}
else if (hint->type == HINT_ROUTE_TO_SLAVE)
{
target = TARGET_SLAVE;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Hint: route to slave.")));
@ -1191,15 +1251,27 @@ static int routeQuery(
char* startpos;
mysql_server_cmd_t packet_type;
uint8_t* packet;
int ret = 0;
int ret = 0;
int tsize = 0;
int klen = 0;
int i = 0;
DCB* master_dcb = NULL;
DCB* target_dcb = NULL;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
rses_property_t* rses_prop_tmp;
bool rses_is_closed = false;
bool target_tmp_table = false;
char* dbname;
char* hkey;
char** tbl;
HASHTABLE* h;
MYSQL_session* data;
size_t len;
MYSQL* mysql = NULL;
route_target_t route_target;
bool succp = false;
int rlag_max = MAX_RLAG_UNDEFINED;
backend_type_t btype; /*< target backend type */
CHK_CLIENT_RSES(router_cli_ses);
@ -1213,6 +1285,7 @@ static int routeQuery(
packet = GWBUF_DATA(querybuf);
packet_type = packet[4];
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
if (rses_is_closed)
{
@ -1222,6 +1295,8 @@ static int routeQuery(
*/
if (packet_type != MYSQL_COM_QUIT)
{
char* query_str = modutil_get_query(querybuf);
LOGIF(LE,
(skygw_log_write_flush(
LOGFILE_ERROR,
@ -1229,18 +1304,21 @@ static int routeQuery(
"backend server. %s.",
STRPACKETTYPE(packet_type),
STRQTYPE(qtype),
(querystr == NULL ? "(empty)" : querystr),
(query_str == NULL ? "(empty)" : query_str),
(rses_is_closed ? "Router was closed" :
"Router has no backend servers where to "
"route to"))));
free(querybuf);
}
goto return_ret;
goto retblock;
}
inst->stats.n_queries++;
startpos = (char *)&packet[5];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
CHK_DCB(master_dcb);
data = (MYSQL_session*)master_dcb->session->data;
dbname = data->db;
switch(packet_type) {
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
@ -1261,44 +1339,16 @@ static int routeQuery(
break;
case MYSQL_COM_QUERY:
plainsqlbuf = gwbuf_clone_transform(querybuf,
GWBUF_TYPE_PLAINSQL);
len = GWBUF_LENGTH(plainsqlbuf);
/** unnecessary if buffer includes additional terminating null */
querystr = (char *)malloc(len+1);
memcpy(querystr, startpos, len);
memset(&querystr[len], 0, 1);
/**
* Use mysql handle to query information from parse tree.
* call skygw_query_classifier_free before exit!
*/
qtype = skygw_query_classifier_get_type(querystr, 0, &mysql);
qtype = query_classifier_get_type(querybuf);
break;
case MYSQL_COM_STMT_PREPARE:
plainsqlbuf = gwbuf_clone_transform(querybuf,
GWBUF_TYPE_PLAINSQL);
len = GWBUF_LENGTH(plainsqlbuf);
/** unnecessary if buffer includes additional terminating null */
querystr = (char *)malloc(len+1);
memcpy(querystr, startpos, len);
memset(&querystr[len], 0, 1);
qtype = skygw_query_classifier_get_type(querystr, 0, &mysql);
qtype = query_classifier_get_type(querybuf);
qtype |= QUERY_TYPE_PREPARE_STMT;
break;
case MYSQL_COM_STMT_EXECUTE:
/** Parsing is not needed for this type of packet */
#if defined(NOT_USED)
plainsqlbuf = gwbuf_clone_transform(querybuf,
GWBUF_TYPE_PLAINSQL);
len = GWBUF_LENGTH(plainsqlbuf);
/** unnecessary if buffer includes additional terminating null */
querystr = (char *)malloc(len+1);
memcpy(querystr, startpos, len);
memset(&querystr[len], 0, 1);
qtype = skygw_query_classifier_get_type(querystr, 0, &mysql);
#endif
qtype = QUERY_TYPE_EXEC_STMT;
break;
@ -1314,6 +1364,48 @@ static int routeQuery(
break;
} /**< switch by packet type */
/**
* Check if the query targets a temporary table
*/
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ))
{
tbl = skygw_get_table_names(querybuf,&tsize);
if (tsize > 0)
{
/** Query targets at least one table */
for(i = 0; i<tsize && !target_tmp_table && tbl[i]; i++)
{
klen = strlen(dbname) + strlen(tbl[i]) + 2;
hkey = calloc(klen,sizeof(char));
strcpy(hkey,dbname);
strcat(hkey,".");
strcat(hkey,tbl[0]);
if (rses_prop_tmp &&
rses_prop_tmp->rses_prop_data.temp_tables)
{
if( (target_tmp_table =
(bool)hashtable_fetch(rses_prop_tmp->rses_prop_data.temp_tables,(void *)hkey)))
{
/**Query target is a temporary table*/
qtype = QUERY_TYPE_READ_TMP_TABLE;
LOGIF(LT,
(skygw_log_write(LOGFILE_TRACE,
"Query targets a temporary table: %s",hkey)));
}
}
free(hkey);
}
for (i = 0; i<tsize; i++)
{
free(tbl[i]);
}
free(tbl);
}
}
/**
* If autocommit is disabled or transaction is explicitly started
* transaction becomes active and master gets all statements until
@ -1350,6 +1442,118 @@ static int routeQuery(
router_cli_ses->rses_autocommit_enabled = true;
router_cli_ses->rses_transaction_active = false;
}
/**
* If query is of type QUERY_TYPE_CREATE_TMP_TABLE then find out
* the database and table name, create a hashvalue and
* add it to the router client session's property. If property
* doesn't exist then create it first. If the query is DROP TABLE...
* then see if it targets a temporary table and remove it from the hashtable
* if it does.
*/
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_CREATE_TMP_TABLE))
{
bool is_temp = true;
char* tblname = NULL;
tblname = skygw_get_created_table_name(querybuf);
if (tblname && strlen(tblname) > 0)
{
klen = strlen(dbname) + strlen(tblname) + 2;
hkey = calloc(klen,sizeof(char));
strcpy(hkey,dbname);
strcat(hkey,".");
strcat(hkey,tblname);
}
else
{
hkey = NULL;
}
if(rses_prop_tmp == NULL)
{
if((rses_prop_tmp =
(rses_property_t*)calloc(1,sizeof(rses_property_t))))
{
#if defined(SS_DEBUG)
rses_prop_tmp->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY;
rses_prop_tmp->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY;
#endif
rses_prop_tmp->rses_prop_rsession = router_cli_ses;
rses_prop_tmp->rses_prop_refcount = 1;
rses_prop_tmp->rses_prop_next = NULL;
rses_prop_tmp->rses_prop_type = RSES_PROP_TYPE_TMPTABLES;
router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES] = rses_prop_tmp;
}
}
if (rses_prop_tmp->rses_prop_data.temp_tables == NULL)
{
h = hashtable_alloc(7, hashkeyfun, hashcmpfun);
hashtable_memory_fns(h,hstrdup,NULL,hfree,NULL);
if (h != NULL)
{
rses_prop_tmp->rses_prop_data.temp_tables = h;
}
}
if (hkey &&
hashtable_add(rses_prop_tmp->rses_prop_data.temp_tables,
(void *)hkey,
(void *)is_temp) == 0) /*< Conflict in hash table */
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Temporary table conflict in hashtable: %s",
hkey)));
}
#if defined(SS_DEBUG)
{
bool retkey =
hashtable_fetch(
rses_prop_tmp->rses_prop_data.temp_tables,
hkey);
if (retkey)
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Temporary table added: %s",
hkey)));
}
}
#endif
free(hkey);
}
/** Check if DROP TABLE... targets a temporary table */
if (is_drop_table_query(querybuf))
{
tbl = skygw_get_table_names(querybuf,&tsize);
for(i = 0; i<tsize; i++)
{
klen = strlen(dbname) + strlen(tbl[i]) + 2;
hkey = calloc(klen,sizeof(char));
strcpy(hkey,dbname);
strcat(hkey,".");
strcat(hkey,tbl[i]);
if (rses_prop_tmp &&
rses_prop_tmp->rses_prop_data.temp_tables)
{
if (hashtable_delete(rses_prop_tmp->rses_prop_data.temp_tables,
(void *)hkey))
{
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"Temporary table dropped: %s",hkey)));
}
}
free(tbl[i]);
free(hkey);
}
free(tbl);
}
/**
* Find out where to route the query. Result may not be clear; it is
* possible to have a hint for routing to a named server which can
@ -1357,161 +1561,193 @@ static int routeQuery(
* If query would otherwise be routed to slave then the hint determines
* actual target server if it exists.
*
* route_target is a bitfield and may include multiple values.
* route_target is a bitfield and may include :
* TARGET_ALL
* - route to all connected backend servers
* TARGET_SLAVE[|TARGET_NAMED_SERVER|TARGET_RLAG_MAX]
* - route primarily according to hints, then to slave and if those
* failed, eventually to master
* TARGET_MASTER[|TARGET_NAMED_SERVER|TARGET_RLAG_MAX]
* - route primarily according to the hints and if they failed,
* eventually to master
*/
route_target = get_route_target(qtype,
router_cli_ses->rses_transaction_active,
querybuf->hint);
if (TARGET_IS_ALL(route_target))
{
/**
* It is not sure if the session command in question requires
* response. Statement is examined in route_session_write.
*/
bool succp = route_session_write(
router_cli_ses,
querybuf,
inst,
packet_type,
qtype);
if (succp)
{
ret = 1;
}
goto return_ret;
}
/**
* Handle routing to master and to slave
*/
else
if (TARGET_IS_ALL(route_target))
{
/**
* It is not sure if the session command in question requires
* response. Statement is examined in route_session_write.
* Router locking is done inside the function.
*/
succp = route_session_write(router_cli_ses,
gwbuf_clone(querybuf),
inst,
packet_type,
qtype);
if (succp)
{
ret = 1;
}
goto retblock;
}
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
goto retblock;
}
/**
* There is a hint which either names the target backend or
* hint which sets maximum allowed replication lag for the
* backend.
*/
if (TARGET_IS_NAMED_SERVER(route_target) ||
TARGET_IS_RLAG_MAX(route_target))
{
HINT* hint;
char* named_server = NULL;
hint = querybuf->hint;
while (hint != NULL)
{
if (hint->type == HINT_ROUTE_TO_NAMED_SERVER)
{
/**
* Set the name of searched
* backend server.
*/
named_server = hint->data;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Hint: route to server "
"'%s'",
named_server)));
}
else if (hint->type == HINT_PARAMETER &&
(strncasecmp((char *)hint->data,
"max_slave_replication_lag",
strlen("max_slave_replication_lag")) == 0))
{
int val = (int) strtol((char *)hint->value,
(char **)NULL, 10);
if (val != 0 || errno == 0)
{
/**
* Set max. acceptable
* replication lag
* value for backend srv
*/
rlag_max = val;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Hint: "
"max_slave_replication_lag=%d",
rlag_max)));
}
}
hint = hint->next;
} /*< while */
if (rlag_max == MAX_RLAG_UNDEFINED) /*< no rlag max hint, use config */
{
rlag_max = rses_get_max_replication_lag(router_cli_ses);
}
btype = BE_UNDEFINED; /*< target may be master or slave */
/**
* Search backend server by name or replication lag.
* If it fails, then try to find valid slave or master.
*/
succp = get_dcb(&target_dcb,
router_cli_ses,
btype,
named_server,
rlag_max);
}
if (!succp && TARGET_IS_SLAVE(route_target))
{
btype = BE_SLAVE;
if (rlag_max == MAX_RLAG_UNDEFINED) /*< no rlag max hint, use config */
{
rlag_max = rses_get_max_replication_lag(router_cli_ses);
}
/**
* Search suitable backend server, get DCB in target_dcb
*/
succp = get_dcb(&target_dcb,
router_cli_ses,
BE_SLAVE,
NULL,
rlag_max);
}
if (!succp && TARGET_IS_MASTER(route_target))
{
if (master_dcb == NULL)
{
succp = get_dcb(&master_dcb,
router_cli_ses,
BE_MASTER,
NULL,
MAX_RLAG_UNDEFINED);
}
else
{
succp = true;
}
target_dcb = master_dcb;
}
ss_dassert(succp);
if (succp) /*< Have DCB of the target backend */
{
if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1)
{
backend_ref_t* bref;
atomic_add(&inst->stats.n_slave, 1);
/**
* Add one query response waiter to backend reference
*/
bref = get_bref_from_dcb(router_cli_ses, target_dcb);
bref_set_state(bref, BREF_QUERY_ACTIVE);
bref_set_state(bref, BREF_WAITING_RESULT);
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing query \"%s\" failed.",
querystr)));
}
}
rses_end_locked_router_action(router_cli_ses);
retblock:
#if defined(SS_DEBUG)
{
bool succp = true;
HINT* hint;
char* named_server = NULL;
int rlag_max = MAX_RLAG_UNDEFINED;
char* canonical_query_str;
if (router_cli_ses->rses_transaction_active) /*< all to master */
canonical_query_str = skygw_get_canonical(querybuf);
if (canonical_query_str != NULL)
{
route_target = TARGET_MASTER; /*< override old value */
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Transaction is active, routing to Master.")));
"Canonical version: %s",
canonical_query_str)));
free(canonical_query_str);
}
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "%s", STRQTYPE(qtype))));
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
goto return_ret;
}
if (TARGET_IS_SLAVE(route_target))
{
if (TARGET_IS_NAMED_SERVER(route_target) ||
TARGET_IS_RLAG_MAX(route_target))
{
hint = querybuf->hint;
while (hint != NULL)
{
if (hint->type == HINT_ROUTE_TO_NAMED_SERVER)
{
named_server = hint->data;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Hint: route to server "
"'%s'",
named_server)));
}
else if (hint->type == HINT_PARAMETER &&
(strncasecmp(
(char *)hint->data,
"max_slave_replication_lag",
strlen("max_slave_replication_lag")) == 0))
{
int val = (int) strtol((char *)hint->value,
(char **)NULL, 10);
if (val != 0 || errno == 0)
{
rlag_max = val;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Hint: "
"max_slave_replication_lag=%d",
rlag_max)));
}
}
hint = hint->next;
}
}
if (rlag_max == MAX_RLAG_UNDEFINED) /*< no rlag max hint, use config */
{
rlag_max = rses_get_max_replication_lag(router_cli_ses);
}
succp = get_dcb(&target_dcb,
router_cli_ses,
BE_SLAVE,
named_server,
rlag_max);
}
else if (TARGET_IS_MASTER(route_target))
{
if (master_dcb == NULL)
{
succp = get_dcb(&master_dcb,
router_cli_ses,
BE_MASTER,
NULL,
MAX_RLAG_UNDEFINED);
}
target_dcb = master_dcb;
}
if (succp) /*< Have DCB of the target backend */
{
if ((ret = target_dcb->func.write(target_dcb, querybuf)) == 1)
{
backend_ref_t* bref;
atomic_add(&inst->stats.n_slave, 1);
/**
* Add one query response waiter to backend reference
*/
bref = get_bref_from_dcb(router_cli_ses, target_dcb);
bref_set_state(bref, BREF_QUERY_ACTIVE);
bref_set_state(bref, BREF_WAITING_RESULT);
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing query \"%s\" failed.",
querystr)));
}
}
rses_end_locked_router_action(router_cli_ses);
}
return_ret:
if (plainsqlbuf != NULL)
{
gwbuf_free(plainsqlbuf);
}
if (querystr != NULL)
{
free(querystr);
}
if (mysql != NULL)
{
skygw_query_classifier_free(mysql);
}
#endif
gwbuf_free(querybuf);
return ret;
}
@ -1725,8 +1961,10 @@ static void clientReply (
(uint8_t *)GWBUF_DATA((scur->scmd_cur_cmd->my_sescmd_buf));
size_t len = MYSQL_GET_PACKET_LEN(buf);
char* cmdstr = (char *)malloc(len+1);
/** data+termination character == len */
snprintf(cmdstr, len, "%s", &buf[5]);
snprintf(cmdstr, len+1, "%s", &buf[5]);
ss_dassert(len+4 == GWBUF_LENGTH(scur->scmd_cur_cmd->my_sescmd_buf));
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
@ -2457,6 +2695,11 @@ static void rses_property_done(
case RSES_PROP_TYPE_SESCMD:
mysql_sescmd_done(&prop->rses_prop_data.sescmd);
break;
case RSES_PROP_TYPE_TMPTABLES:
hashtable_free(prop->rses_prop_data.temp_tables);
break;
default:
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
@ -2856,8 +3099,23 @@ static bool execute_sescmd_in_backend(
sescmd_cursor_clone_querybuf(scur));
break;
case MYSQL_COM_QUERY:
case MYSQL_COM_INIT_DB:
case MYSQL_COM_INIT_DB:
{
/**
* Record database name and store to session.
*/
GWBUF* tmpbuf;
MYSQL_session* data;
unsigned int qlen;
data = dcb->session->data;
tmpbuf = scur->scmd_cur_cmd->my_sescmd_buf;
qlen = MYSQL_GET_PACKET_LEN((unsigned char*)tmpbuf->start);
memset(data->db,0,MYSQL_DATABASE_MAXLEN+1);
strncpy(data->db,tmpbuf->start+5,qlen - 1);
}
/** Fallthrough */
case MYSQL_COM_QUERY:
default:
/**
* Mark session command buffer, it triggers writing
@ -3142,7 +3400,7 @@ static bool route_session_write(
{
succp = false;
goto return_succp;
}
}
/**
* Additional reference is created to querybuf to
* prevent it from being released before properties
@ -3831,4 +4089,3 @@ static BACKEND *get_root_master(backend_ref_t *servers, int router_nservers) {
}
return master_host;
}

View File

@ -230,3 +230,65 @@ if [ "$a" != "$TRETVAL" ]; then
else
echo "$TINPUT PASSED">>$TLOG ;
fi
TINPUT=test_temporary_table.sql
a=`$RUNCMD < ./$TINPUT`
TRETVAL=1
if [ "$a" != "$TRETVAL" ]; then
echo "$TINPUT FAILED, return value $a when $TRETVAL was expected">>$TLOG;
else
echo "$TINPUT PASSED">>$TLOG ;
fi
echo "-----------------------------------" >> $TLOG
echo "Session variables: Stress Test 1" >> $TLOG
echo "-----------------------------------" >> $TLOG
RUNCMD=mysql\ --host=$THOST\ -P$TPORT\ -u$TUSER\ -p$TPWD\ --unbuffered=true\ --disable-reconnect\ -q\ -r
TINPUT=test_sescmd2.sql
for ((i = 0;i<1000;i++))
do
if [[ $(( i % 50 )) -eq 0 ]]
then
printf "."
fi
a=`$RUNCMD < $TINPUT 2>&1`
if [[ "`echo "$a"|grep -i 'error'`" != "" ]]
then
err=`echo "$a" | grep -i error`
break
fi
done
if [[ "$err" == "" ]]
then
echo "TEST PASSED" >> $TLOG
else
echo "$err" >> $TLOG
echo "Test FAILED at iteration $((i+1))" >> $TLOG
fi
echo "-----------------------------------" >> $TLOG
echo "Session variables: Stress Test 2" >> $TLOG
echo "-----------------------------------" >> $TLOG
echo ""
err=""
TINPUT=test_sescmd3.sql
for ((j = 0;j<1000;j++))
do
if [[ $(( j % 50 )) -eq 0 ]]
then
printf "."
fi
b=`$RUNCMD < $TINPUT 2>&1`
if [[ "`echo "$b"|grep -i 'null|error'`" != "" ]]
then
err=`echo "$b" | grep -i null|error`
break
fi
done
if [[ "$err" == "" ]]
then
echo "TEST PASSED" >> $TLOG
else
echo "Test FAILED at iteration $((j+1))" >> $TLOG
fi
echo "" >> $TLOG

View File

@ -1,15 +0,0 @@
-------------------------------
Tue Aug 5 13:31:41 EEST 2014
Test Read/Write split router - hint routing
-------------------------------
-- maxscale route to master FAILED, return value 3003 when 3000 was expected
-- maxscale route to server server1 FAILED, return value 3003 when 3000 was expected
-- maxscale route to server server2 FAILED, return value 3003 when 3001 was expected
-- maxscale route to server server3 FAILED, return value 3003 when 3002 was expected
-- maxscale route to server server4 PASSED
-- maxscale max_slave_replication_lag = 100 FAILED, return value 3003 when was expected
-- maxscale max_slave_replication_lag= 100 FAILED, return value 3003 when was expected
-- maxscale max_slave_replication_lag =100 FAILED, return value 3003 when was expected
-- maxscale max_slave_replication_lag=100 FAILED, return value 3003 when was expected

View File

@ -0,0 +1,20 @@
/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!40101 SET NAMES utf8 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

View File

@ -0,0 +1,16 @@
/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
select @OLD_SQL_NOTES;
select @OLD_CHARACTER_SET_CLIENT;
select @OLD_CHARACTER_SET_RESULTS;
select @OLD_COLLATION_CONNECTION;
select @OLD_TIME_ZONE;
select @OLD_UNIQUE_CHECKS;
select @OLD_FOREIGN_KEY_CHECKS;
select @OLD_SQL_MODE;

View File

@ -0,0 +1,5 @@
use test;
drop table if exists t1;
create temporary table t1 (id integer);
insert into t1 values(1);
select id from t1;