MAX-157. Added support for hints in rwsplit router.
buffer.c: added memory release for hint of a GWBUF hint.c: added bool hint_exists() hint.h: added placeholder for hint type HINT_ROUTE_TO_ALL which doesn't have implementation yet. filter/Makefile: fixed dependency issue hintparser.c: added const char* token_get_keyword(), hint_parser:added NULL check, hint_next_token: fixed off-by-one bug readwritesplit.h: added bitfield for hints' use, which includes route targets and flag for case where user hinted to route to named backend server. readwritesplit.c: added function route_target_t get_route_target() for resolving route target based on 1) query type (from query_classifier) 2) transaction state (active/not) and 3) hints. Modified get_dcb, which is called in routeQuery to provide pointer to correct backend DCB. Now get_dcb also takes server unique name as a parameter if such a hint was found. for hints' use, which includes enter the commit message for your changes.
This commit is contained in:
@ -464,7 +464,7 @@ static skygw_query_type_t resolve_query_type(
|
|||||||
type |= QUERY_TYPE_DISABLE_AUTOCOMMIT;
|
type |= QUERY_TYPE_DISABLE_AUTOCOMMIT;
|
||||||
type |= QUERY_TYPE_BEGIN_TRX;
|
type |= QUERY_TYPE_BEGIN_TRX;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* REVOKE ALL, ASSIGN_TO_KEYCACHE,
|
* REVOKE ALL, ASSIGN_TO_KEYCACHE,
|
||||||
* PRELOAD_KEYS, FLUSH, RESET, CREATE|ALTER|DROP SERVER
|
* PRELOAD_KEYS, FLUSH, RESET, CREATE|ALTER|DROP SERVER
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -116,6 +116,12 @@ BUF_PROPERTY *prop;
|
|||||||
free(prop->value);
|
free(prop->value);
|
||||||
free(prop);
|
free(prop);
|
||||||
}
|
}
|
||||||
|
while (buf->hint)
|
||||||
|
{
|
||||||
|
HINT* h = buf->hint;
|
||||||
|
buf->hint = buf->hint->next;
|
||||||
|
hint_free(h);
|
||||||
|
}
|
||||||
free(buf);
|
free(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -145,6 +151,7 @@ GWBUF *rval;
|
|||||||
rval->end = buf->end;
|
rval->end = buf->end;
|
||||||
rval->gwbuf_type = buf->gwbuf_type;
|
rval->gwbuf_type = buf->gwbuf_type;
|
||||||
rval->properties = NULL;
|
rval->properties = NULL;
|
||||||
|
rval->hint = NULL;
|
||||||
rval->next = NULL;
|
rval->next = NULL;
|
||||||
CHK_GWBUF(rval);
|
CHK_GWBUF(rval);
|
||||||
return rval;
|
return rval;
|
||||||
@ -172,6 +179,7 @@ GWBUF *gwbuf_clone_portion(
|
|||||||
clonebuf->end = (void *)((char *)clonebuf->start)+length;
|
clonebuf->end = (void *)((char *)clonebuf->start)+length;
|
||||||
clonebuf->gwbuf_type = buf->gwbuf_type; /*< clone the type for now */
|
clonebuf->gwbuf_type = buf->gwbuf_type; /*< clone the type for now */
|
||||||
clonebuf->properties = NULL;
|
clonebuf->properties = NULL;
|
||||||
|
clonebuf->hint = NULL;
|
||||||
clonebuf->next = NULL;
|
clonebuf->next = NULL;
|
||||||
CHK_GWBUF(clonebuf);
|
CHK_GWBUF(clonebuf);
|
||||||
return clonebuf;
|
return clonebuf;
|
||||||
|
|||||||
@ -134,3 +134,20 @@ hint_free(HINT *hint)
|
|||||||
free(hint->value);
|
free(hint->value);
|
||||||
free(hint);
|
free(hint);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool hint_exists(
|
||||||
|
HINT** p_hint,
|
||||||
|
HINT_TYPE type)
|
||||||
|
{
|
||||||
|
bool succp = false;
|
||||||
|
|
||||||
|
while (*p_hint != NULL)
|
||||||
|
{
|
||||||
|
if ((*p_hint)->type == type)
|
||||||
|
{
|
||||||
|
succp = true;
|
||||||
|
}
|
||||||
|
p_hint = &(*p_hint)->next;
|
||||||
|
}
|
||||||
|
return succp;
|
||||||
|
}
|
||||||
@ -30,6 +30,9 @@
|
|||||||
* @endverbatim
|
* @endverbatim
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <skygw_debug.h>
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The types of hint that are supported by the generic hinting mechanism.
|
* The types of hint that are supported by the generic hinting mechanism.
|
||||||
*/
|
*/
|
||||||
@ -38,6 +41,7 @@ typedef enum {
|
|||||||
HINT_ROUTE_TO_SLAVE,
|
HINT_ROUTE_TO_SLAVE,
|
||||||
HINT_ROUTE_TO_NAMED_SERVER,
|
HINT_ROUTE_TO_NAMED_SERVER,
|
||||||
HINT_ROUTE_TO_UPTODATE_SERVER,
|
HINT_ROUTE_TO_UPTODATE_SERVER,
|
||||||
|
HINT_ROUTE_TO_ALL, /*< not implemented yet */
|
||||||
HINT_PARAMETER
|
HINT_PARAMETER
|
||||||
} HINT_TYPE;
|
} HINT_TYPE;
|
||||||
|
|
||||||
@ -50,7 +54,7 @@ typedef enum {
|
|||||||
*/
|
*/
|
||||||
typedef struct hint {
|
typedef struct hint {
|
||||||
HINT_TYPE type; /*< The Type of hint */
|
HINT_TYPE type; /*< The Type of hint */
|
||||||
void *data; /*< Type sepecific data */
|
void *data; /*< Type specific data */
|
||||||
void *value; /*< Parameter value for hint */
|
void *value; /*< Parameter value for hint */
|
||||||
unsigned int dsize; /*< Size of the hint data */
|
unsigned int dsize; /*< Size of the hint data */
|
||||||
struct hint *next; /*< Another hint for this buffer */
|
struct hint *next; /*< Another hint for this buffer */
|
||||||
@ -61,4 +65,5 @@ extern HINT *hint_create_parameter(HINT *, char *, char *);
|
|||||||
extern HINT *hint_create_route(HINT *, HINT_TYPE, char *);
|
extern HINT *hint_create_route(HINT *, HINT_TYPE, char *);
|
||||||
extern void hint_free(HINT *);
|
extern void hint_free(HINT *);
|
||||||
extern HINT *hint_dup(HINT *);
|
extern HINT *hint_dup(HINT *);
|
||||||
|
bool hint_exists(HINT **, HINT_TYPE);
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@ -74,16 +74,16 @@ libhintfilter.so:
|
|||||||
|
|
||||||
clean:
|
clean:
|
||||||
rm -f $(OBJ) $(MODULES)
|
rm -f $(OBJ) $(MODULES)
|
||||||
(cd hint; make clean)
|
(cd hint; touch depend.mk; make clean)
|
||||||
|
|
||||||
tags:
|
tags:
|
||||||
ctags $(SRCS) $(HDRS)
|
ctags $(SRCS) $(HDRS)
|
||||||
(cd hint; make tags)
|
(cd hint; touch depend.mk; make tags)
|
||||||
|
|
||||||
depend:
|
depend:
|
||||||
@rm -f depend.mk
|
@rm -f depend.mk
|
||||||
cc -M $(CFLAGS) $(SRCS) > depend.mk
|
cc -M $(CFLAGS) $(SRCS) > depend.mk
|
||||||
(cd hint; make depend)
|
(cd hint; touch depend.mk; make depend)
|
||||||
|
|
||||||
install: $(MODULES)
|
install: $(MODULES)
|
||||||
install -D $(MODULES) $(DEST)/modules
|
install -D $(MODULES) $(DEST)/modules
|
||||||
|
|||||||
@ -18,11 +18,15 @@
|
|||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <ctype.h>
|
#include <ctype.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
#include <skygw_utils.h>
|
||||||
|
#include <log_manager.h>
|
||||||
#include <filter.h>
|
#include <filter.h>
|
||||||
#include <modinfo.h>
|
#include <modinfo.h>
|
||||||
#include <modutil.h>
|
#include <modutil.h>
|
||||||
#include <mysqlhint.h>
|
#include <mysqlhint.h>
|
||||||
|
|
||||||
|
extern int lm_enabled_logfiles_bitmask;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* hintparser.c - Find any comment in the SQL packet and look for MAXSCALE
|
* hintparser.c - Find any comment in the SQL packet and look for MAXSCALE
|
||||||
* hints in that comment.
|
* hints in that comment.
|
||||||
@ -49,15 +53,61 @@ struct {
|
|||||||
{ "server", TOK_SERVER },
|
{ "server", TOK_SERVER },
|
||||||
{ NULL, 0 }
|
{ NULL, 0 }
|
||||||
};
|
};
|
||||||
|
/**
|
||||||
|
HINT_TOKEN kwords[] = {
|
||||||
|
{ TOK_MAXSCALE, "maxscale" },
|
||||||
|
{ TOK_PREPARE, "prepare" },
|
||||||
|
{ TOK_START, "start" },
|
||||||
|
{ TOK_START, "begin" },
|
||||||
|
{ TOK_STOP, "stop" },
|
||||||
|
{ TOK_STOP, "end" },
|
||||||
|
{ TOK_EQUAL, "=" },
|
||||||
|
{ TOK_ROUTE, "route" },
|
||||||
|
{ TOK_TO, "to" },
|
||||||
|
{ TOK_MASTER, "master" },
|
||||||
|
{ TOK_SLAVE, "slave" },
|
||||||
|
{ TOK_SERVER, "server" },
|
||||||
|
{ 0, NULL}
|
||||||
|
};
|
||||||
|
*/
|
||||||
|
|
||||||
static HINT_TOKEN *hint_next_token(GWBUF **buf, char **ptr);
|
static HINT_TOKEN *hint_next_token(GWBUF **buf, char **ptr);
|
||||||
static void hint_pop(HINT_SESSION *);
|
static void hint_pop(HINT_SESSION *);
|
||||||
static HINT *lookup_named_hint(HINT_SESSION *, char *);
|
static HINT *lookup_named_hint(HINT_SESSION *, char *);
|
||||||
static void create_named_hint(HINT_SESSION *, char *, HINT *);
|
static void create_named_hint(HINT_SESSION *, char *, HINT *);
|
||||||
static void hint_push(HINT_SESSION *, HINT *);
|
static void hint_push(HINT_SESSION *, HINT *);
|
||||||
|
static const char* token_get_keyword (TOKEN_VALUE token);
|
||||||
|
|
||||||
typedef enum { HM_EXECUTE, HM_START, HM_PREPARE } HINT_MODE;
|
typedef enum { HM_EXECUTE, HM_START, HM_PREPARE } HINT_MODE;
|
||||||
|
|
||||||
|
static const char* token_get_keyword (
|
||||||
|
TOKEN_VALUE token)
|
||||||
|
{
|
||||||
|
switch (token) {
|
||||||
|
case TOK_EOL:
|
||||||
|
return "End of line";
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
int i = 0;
|
||||||
|
while (i < TOK_EOL && keywords[i].token != token)
|
||||||
|
i++;
|
||||||
|
|
||||||
|
ss_dassert(i != TOK_EOL);
|
||||||
|
|
||||||
|
if (i == TOK_EOL)
|
||||||
|
{
|
||||||
|
return "Unknown token";
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return keywords[i].keyword;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Parse the hint comments in the MySQL statement passed in request.
|
* Parse the hint comments in the MySQL statement passed in request.
|
||||||
* Add any hints to the buffer for later processing.
|
* Add any hints to the buffer for later processing.
|
||||||
@ -160,8 +210,20 @@ HINT_MODE mode = HM_EXECUTE;
|
|||||||
}
|
}
|
||||||
|
|
||||||
tok = hint_next_token(&buf, &ptr);
|
tok = hint_next_token(&buf, &ptr);
|
||||||
|
|
||||||
|
if (tok == NULL)
|
||||||
|
{
|
||||||
|
goto retblock;
|
||||||
|
}
|
||||||
|
|
||||||
if (tok->token != TOK_MAXSCALE)
|
if (tok->token != TOK_MAXSCALE)
|
||||||
{
|
{
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error : Invalid hint string '%s'. Hint should start "
|
||||||
|
"with keyword '%s'",
|
||||||
|
token_get_keyword(tok->token),
|
||||||
|
token_get_keyword(TOK_MAXSCALE))));
|
||||||
free(tok);
|
free(tok);
|
||||||
goto retblock;
|
goto retblock;
|
||||||
}
|
}
|
||||||
@ -369,7 +431,7 @@ HINT_TOKEN *tok;
|
|||||||
return NULL;
|
return NULL;
|
||||||
tok->value = NULL;
|
tok->value = NULL;
|
||||||
dest = word;
|
dest = word;
|
||||||
while (*ptr <= (char *)((*buf)->end) || (*buf)->next)
|
while (*ptr < (char *)((*buf)->end) || (*buf)->next)
|
||||||
{
|
{
|
||||||
if (inword && inquote == '\0' &&
|
if (inword && inquote == '\0' &&
|
||||||
(**ptr == '=' || isspace(**ptr)))
|
(**ptr == '=' || isspace(**ptr)))
|
||||||
|
|||||||
@ -67,10 +67,22 @@ typedef enum backend_type_t {
|
|||||||
BE_UNDEFINED=-1,
|
BE_UNDEFINED=-1,
|
||||||
BE_MASTER,
|
BE_MASTER,
|
||||||
BE_JOINED = BE_MASTER,
|
BE_JOINED = BE_MASTER,
|
||||||
BE_SLAVE,
|
BE_SLAVE,
|
||||||
BE_COUNT
|
BE_COUNT
|
||||||
} backend_type_t;
|
} backend_type_t;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
TARGET_MASTER = 0x01,
|
||||||
|
TARGET_SLAVE = 0x02,
|
||||||
|
TARGET_NAMED_SERVER = 0x04,
|
||||||
|
TARGET_ALL = 0x08
|
||||||
|
} route_target_t;
|
||||||
|
|
||||||
|
#define TARGET_IS_MASTER(t) (t & TARGET_MASTER)
|
||||||
|
#define TARGET_IS_SLAVE(t) (t & TARGET_SLAVE)
|
||||||
|
#define TARGET_IS_NAMED_SERVER(t) (t & TARGET_NAMED_SERVER)
|
||||||
|
#define TARGET_IS_ALL(t) (t & TARGET_ALL)
|
||||||
|
|
||||||
typedef struct rses_property_st rses_property_t;
|
typedef struct rses_property_st rses_property_t;
|
||||||
typedef struct router_client_session ROUTER_CLIENT_SES;
|
typedef struct router_client_session ROUTER_CLIENT_SES;
|
||||||
|
|
||||||
|
|||||||
@ -97,6 +97,12 @@ static int rses_get_max_slavecount(ROUTER_CLIENT_SES* rses, int router_nservers
|
|||||||
static int rses_get_max_replication_lag(ROUTER_CLIENT_SES* rses);
|
static int rses_get_max_replication_lag(ROUTER_CLIENT_SES* rses);
|
||||||
static backend_ref_t* get_bref_from_dcb(ROUTER_CLIENT_SES* rses, DCB* dcb);
|
static backend_ref_t* get_bref_from_dcb(ROUTER_CLIENT_SES* rses, DCB* dcb);
|
||||||
|
|
||||||
|
static route_target_t get_route_target (
|
||||||
|
skygw_query_type_t qtype,
|
||||||
|
bool trx_active,
|
||||||
|
HINT* hint);
|
||||||
|
|
||||||
|
|
||||||
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
|
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
|
||||||
|
|
||||||
#if defined(NOT_USED)
|
#if defined(NOT_USED)
|
||||||
@ -153,7 +159,8 @@ static bool select_connect_backend_servers(
|
|||||||
static bool get_dcb(
|
static bool get_dcb(
|
||||||
DCB** dcb,
|
DCB** dcb,
|
||||||
ROUTER_CLIENT_SES* rses,
|
ROUTER_CLIENT_SES* rses,
|
||||||
backend_type_t btype);
|
backend_type_t btype,
|
||||||
|
char* name);
|
||||||
|
|
||||||
static void rwsplit_process_router_options(
|
static void rwsplit_process_router_options(
|
||||||
ROUTER_INSTANCE* router,
|
ROUTER_INSTANCE* router,
|
||||||
@ -912,19 +919,28 @@ static void freeSession(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provide a pointer to a suitable backend dcb.
|
* Provide the router with a pointer to a suitable backend dcb.
|
||||||
* Detect failures in server statuses and reselect backends if necessary.
|
* Detect failures in server statuses and reselect backends if necessary.
|
||||||
|
* If name is specified, server name becomes primary selection criteria.
|
||||||
|
*
|
||||||
|
* @param p_dcb Address of the pointer to the resulting DCB
|
||||||
|
* @param rses Pointer to router client session
|
||||||
|
* @param btype Backend type
|
||||||
|
* @param name Name of the backend which is primarily searched. May be NULL.
|
||||||
|
*
|
||||||
|
* @return True if proper DCB was found, false otherwise.
|
||||||
*/
|
*/
|
||||||
static bool get_dcb(
|
static bool get_dcb(
|
||||||
DCB** p_dcb,
|
DCB** p_dcb,
|
||||||
ROUTER_CLIENT_SES* rses,
|
ROUTER_CLIENT_SES* rses,
|
||||||
backend_type_t btype)
|
backend_type_t btype,
|
||||||
|
char* name)
|
||||||
{
|
{
|
||||||
backend_ref_t* backend_ref;
|
backend_ref_t* backend_ref;
|
||||||
int smallest_nconn = -1;
|
int smallest_nconn = -1;
|
||||||
int i;
|
int i;
|
||||||
bool succp = false;
|
bool succp = false;
|
||||||
BACKEND *master_host = NULL;
|
BACKEND* master_host;
|
||||||
|
|
||||||
CHK_CLIENT_RSES(rses);
|
CHK_CLIENT_RSES(rses);
|
||||||
ss_dassert(p_dcb != NULL && *(p_dcb) == NULL);
|
ss_dassert(p_dcb != NULL && *(p_dcb) == NULL);
|
||||||
@ -935,55 +951,87 @@ static bool get_dcb(
|
|||||||
}
|
}
|
||||||
backend_ref = rses->rses_backend_ref;
|
backend_ref = rses->rses_backend_ref;
|
||||||
|
|
||||||
/* get root master from availbal servers */
|
/* get root master from available servers */
|
||||||
master_host = get_root_master(backend_ref, rses->rses_nbackends);
|
master_host = get_root_master(backend_ref, rses->rses_nbackends);
|
||||||
|
|
||||||
if (btype == BE_SLAVE)
|
if (btype == BE_SLAVE)
|
||||||
{
|
{
|
||||||
for (i=0; i<rses->rses_nbackends; i++)
|
if (name != NULL) /*< Choose backend by name (hint) */
|
||||||
{
|
{
|
||||||
BACKEND* b = backend_ref[i].bref_backend;
|
for (i=0; i<rses->rses_nbackends; i++)
|
||||||
/* check slave bit, also for relay servers (Master & Servers) */
|
|
||||||
if (BREF_IS_IN_USE((&backend_ref[i])) &&
|
|
||||||
(SERVER_IS_SLAVE(b->backend_server) || SERVER_IS_RELAY_SERVER(b->backend_server)) &&
|
|
||||||
(master_host != NULL && b->backend_server != master_host->backend_server) &&
|
|
||||||
(smallest_nconn == -1 ||
|
|
||||||
b->backend_conn_count < smallest_nconn))
|
|
||||||
{
|
{
|
||||||
*p_dcb = backend_ref[i].bref_dcb;
|
BACKEND* b = backend_ref[i].bref_backend;
|
||||||
smallest_nconn = b->backend_conn_count;
|
|
||||||
succp = true;
|
/**
|
||||||
ss_dassert(backend_ref[i].bref_dcb->state != DCB_STATE_ZOMBIE);
|
* To become chosen:
|
||||||
|
* backend must be in use, name must match,
|
||||||
|
* root master node must be found,
|
||||||
|
* backend's role must be either slave, relay
|
||||||
|
* server, or master.
|
||||||
|
*/
|
||||||
|
if (BREF_IS_IN_USE((&backend_ref[i])) &&
|
||||||
|
(strncasecmp(
|
||||||
|
name,
|
||||||
|
b->backend_server->unique_name,
|
||||||
|
MIN(strlen(b->backend_server->unique_name), PATH_MAX)) == 0) &&
|
||||||
|
master_host != NULL &&
|
||||||
|
(SERVER_IS_SLAVE(b->backend_server) ||
|
||||||
|
SERVER_IS_RELAY_SERVER(b->backend_server) ||
|
||||||
|
SERVER_IS_MASTER(b->backend_server)))
|
||||||
|
{
|
||||||
|
*p_dcb = backend_ref[i].bref_dcb;
|
||||||
|
succp = true;
|
||||||
|
ss_dassert(backend_ref[i].bref_dcb->state != DCB_STATE_ZOMBIE);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!succp)
|
if (!succp) /*< No hints or finding named backend failed */
|
||||||
{
|
{
|
||||||
backend_ref = rses->rses_master_ref;
|
for (i=0; i<rses->rses_nbackends; i++)
|
||||||
|
|
||||||
if (BREF_IS_IN_USE(backend_ref))
|
|
||||||
{
|
{
|
||||||
*p_dcb = backend_ref->bref_dcb;
|
BACKEND* b = backend_ref[i].bref_backend;
|
||||||
succp = true;
|
/**
|
||||||
|
* To become chosen:
|
||||||
ss_dassert(backend_ref->bref_dcb->state != DCB_STATE_ZOMBIE);
|
* backend must be in use,
|
||||||
|
* root master node must be found,
|
||||||
ss_dassert(
|
* backend is not allowed to be the master,
|
||||||
(master_host && (backend_ref->bref_backend->backend_server == master_host->backend_server)) &&
|
* backend's role can be either slave or relay
|
||||||
smallest_nconn == -1);
|
* server and it must have least connections
|
||||||
|
* at the moment.
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
*/
|
||||||
LOGFILE_ERROR,
|
if (BREF_IS_IN_USE((&backend_ref[i])) &&
|
||||||
"Warning : No slaves connected nor "
|
master_host != NULL &&
|
||||||
"available. Choosing master %s:%d "
|
b->backend_server != master_host->backend_server &&
|
||||||
"instead.",
|
(SERVER_IS_SLAVE(b->backend_server) ||
|
||||||
backend_ref->bref_backend->backend_server->name,
|
SERVER_IS_RELAY_SERVER(b->backend_server)) &&
|
||||||
backend_ref->bref_backend->backend_server->port)));
|
(smallest_nconn == -1 ||
|
||||||
|
b->backend_conn_count < smallest_nconn))
|
||||||
|
{
|
||||||
|
*p_dcb = backend_ref[i].bref_dcb;
|
||||||
|
smallest_nconn = b->backend_conn_count;
|
||||||
|
succp = true;
|
||||||
|
ss_dassert(backend_ref[i].bref_dcb->state != DCB_STATE_ZOMBIE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ss_dassert(succp);
|
|
||||||
|
if (!succp) /*< No valid slave was found, search master next */
|
||||||
|
{
|
||||||
|
btype = BE_MASTER;
|
||||||
|
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Warning : No slaves connected nor "
|
||||||
|
"available. Choosing master %s:%d "
|
||||||
|
"instead.",
|
||||||
|
backend_ref->bref_backend->backend_server->name,
|
||||||
|
backend_ref->bref_backend->backend_server->port)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else if (btype == BE_MASTER)
|
|
||||||
|
if (btype == BE_MASTER)
|
||||||
{
|
{
|
||||||
for (i=0; i<rses->rses_nbackends; i++)
|
for (i=0; i<rses->rses_nbackends; i++)
|
||||||
{
|
{
|
||||||
@ -998,10 +1046,73 @@ static bool get_dcb(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return_succp:
|
return_succp:
|
||||||
return succp;
|
return succp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Examine the query type, transaction state and routing hints. Find out the
|
||||||
|
* target for query routing.
|
||||||
|
*
|
||||||
|
* @param qtype Type of query
|
||||||
|
* @param trx_active Is transacation active or not
|
||||||
|
* @param hint Pointer to list of hints attached to the query buffer
|
||||||
|
*
|
||||||
|
* @return bitfield including the routing target, or the target server name
|
||||||
|
* if the query would otherwise be routed to slave.
|
||||||
|
*/
|
||||||
|
static route_target_t get_route_target (
|
||||||
|
skygw_query_type_t qtype,
|
||||||
|
bool trx_active,
|
||||||
|
HINT* hint)
|
||||||
|
{
|
||||||
|
route_target_t target;
|
||||||
|
|
||||||
|
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE) ||
|
||||||
|
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_STMT) ||
|
||||||
|
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_NAMED_STMT))
|
||||||
|
{
|
||||||
|
/** hints don't affect on routing */
|
||||||
|
target = TARGET_ALL;
|
||||||
|
}
|
||||||
|
else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) && !trx_active)
|
||||||
|
{
|
||||||
|
target = TARGET_SLAVE;
|
||||||
|
|
||||||
|
/** process routing hints */
|
||||||
|
while (hint != NULL)
|
||||||
|
{
|
||||||
|
if (hint->type == HINT_ROUTE_TO_MASTER)
|
||||||
|
{
|
||||||
|
target = TARGET_MASTER; /*< override */
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else if (hint->type == HINT_ROUTE_TO_NAMED_SERVER)
|
||||||
|
{
|
||||||
|
target |= TARGET_NAMED_SERVER; /*< add */
|
||||||
|
}
|
||||||
|
else if (hint->type == HINT_ROUTE_TO_UPTODATE_SERVER)
|
||||||
|
{
|
||||||
|
/** not implemented */
|
||||||
|
}
|
||||||
|
else if (hint->type == HINT_ROUTE_TO_ALL)
|
||||||
|
{
|
||||||
|
/** not implemented */
|
||||||
|
}
|
||||||
|
hint = hint->next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/** hints don't affect on routing */
|
||||||
|
target = TARGET_MASTER;
|
||||||
|
}
|
||||||
|
|
||||||
|
return target;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The main routing entry, this is called with every packet that is
|
* The main routing entry, this is called with every packet that is
|
||||||
* received and has to be forwarded to the backend database.
|
* received and has to be forwarded to the backend database.
|
||||||
@ -1038,12 +1149,13 @@ static int routeQuery(
|
|||||||
uint8_t* packet;
|
uint8_t* packet;
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
DCB* master_dcb = NULL;
|
DCB* master_dcb = NULL;
|
||||||
DCB* slave_dcb = NULL;
|
DCB* target_dcb = NULL;
|
||||||
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
||||||
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||||
bool rses_is_closed = false;
|
bool rses_is_closed = false;
|
||||||
size_t len;
|
size_t len;
|
||||||
MYSQL* mysql = NULL;
|
MYSQL* mysql = NULL;
|
||||||
|
route_target_t route_target;
|
||||||
|
|
||||||
CHK_CLIENT_RSES(router_cli_ses);
|
CHK_CLIENT_RSES(router_cli_ses);
|
||||||
|
|
||||||
@ -1191,12 +1303,20 @@ static int routeQuery(
|
|||||||
router_cli_ses->rses_autocommit_enabled = true;
|
router_cli_ses->rses_autocommit_enabled = true;
|
||||||
router_cli_ses->rses_transaction_active = false;
|
router_cli_ses->rses_transaction_active = false;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Session update is always routed in the same way.
|
* Find out where to route the query. Result may not be clear; it is
|
||||||
|
* possible to have a hint for routing to a named server which can
|
||||||
|
* be either slave or master.
|
||||||
|
* If query would otherwise be routed to slave then the hint determines
|
||||||
|
* actual target server if it exists.
|
||||||
|
*
|
||||||
|
* route_target is a bitfield and may include multiple values.
|
||||||
*/
|
*/
|
||||||
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE) ||
|
route_target = get_route_target(qtype,
|
||||||
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_STMT) ||
|
router_cli_ses->rses_transaction_active,
|
||||||
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_NAMED_STMT))
|
querybuf->hint);
|
||||||
|
|
||||||
|
if (TARGET_IS_ALL(route_target))
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* It is not sure if the session command in question requires
|
* It is not sure if the session command in question requires
|
||||||
@ -1215,35 +1335,76 @@ static int routeQuery(
|
|||||||
}
|
}
|
||||||
goto return_ret;
|
goto return_ret;
|
||||||
}
|
}
|
||||||
else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) &&
|
/**
|
||||||
!router_cli_ses->rses_transaction_active)
|
* Handle routing to master and to slave
|
||||||
|
*/
|
||||||
|
else
|
||||||
{
|
{
|
||||||
bool succp;
|
bool succp = true;
|
||||||
|
HINT* hint;
|
||||||
|
char* named_server = NULL;
|
||||||
|
|
||||||
LOGIF(LT, (skygw_log_write(
|
if (router_cli_ses->rses_transaction_active) /*< all to master */
|
||||||
LOGFILE_TRACE,
|
{
|
||||||
"[%s]\tRead-only query, routing to Slave.",
|
route_target = TARGET_MASTER; /*< override old value */
|
||||||
inst->service->name)));
|
|
||||||
ss_dassert(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ));
|
LOGIF(LT, (skygw_log_write(
|
||||||
|
LOGFILE_TRACE,
|
||||||
|
"Transaction is active, routing to Master.")));
|
||||||
|
}
|
||||||
|
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "%s", STRQTYPE(qtype))));
|
||||||
|
|
||||||
/** Lock router session */
|
/** Lock router session */
|
||||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
if (!rses_begin_locked_router_action(router_cli_ses))
|
||||||
{
|
{
|
||||||
goto return_ret;
|
goto return_ret;
|
||||||
}
|
}
|
||||||
succp = get_dcb(&slave_dcb, router_cli_ses, BE_SLAVE);
|
|
||||||
|
if (TARGET_IS_SLAVE(route_target))
|
||||||
|
{
|
||||||
|
if (TARGET_IS_NAMED_SERVER(route_target))
|
||||||
|
{
|
||||||
|
hint = querybuf->hint;
|
||||||
|
|
||||||
|
while (hint != NULL &&
|
||||||
|
hint->type != HINT_ROUTE_TO_NAMED_SERVER)
|
||||||
|
{
|
||||||
|
hint = hint->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hint != NULL)
|
||||||
|
{
|
||||||
|
named_server = hint->data;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
succp = get_dcb(&target_dcb,
|
||||||
|
router_cli_ses,
|
||||||
|
BE_SLAVE,
|
||||||
|
named_server);
|
||||||
|
}
|
||||||
|
else if (TARGET_IS_MASTER(route_target))
|
||||||
|
{
|
||||||
|
if (master_dcb == NULL)
|
||||||
|
{
|
||||||
|
succp = get_dcb(&master_dcb,
|
||||||
|
router_cli_ses,
|
||||||
|
BE_MASTER,
|
||||||
|
NULL);
|
||||||
|
}
|
||||||
|
target_dcb = master_dcb;
|
||||||
|
}
|
||||||
|
|
||||||
if (succp)
|
if (succp) /*< Have DCB of the target backend */
|
||||||
{
|
{
|
||||||
if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1)
|
if ((ret = target_dcb->func.write(target_dcb, querybuf)) == 1)
|
||||||
{
|
{
|
||||||
backend_ref_t* bref;
|
backend_ref_t* bref;
|
||||||
|
|
||||||
atomic_add(&inst->stats.n_slave, 1);
|
atomic_add(&inst->stats.n_slave, 1);
|
||||||
/**
|
/**
|
||||||
* Add one query response waiter to backend reference
|
* Add one query response waiter to backend reference
|
||||||
*/
|
*/
|
||||||
bref = get_bref_from_dcb(router_cli_ses, slave_dcb);
|
bref = get_bref_from_dcb(router_cli_ses, target_dcb);
|
||||||
bref_set_state(bref, BREF_QUERY_ACTIVE);
|
bref_set_state(bref, BREF_QUERY_ACTIVE);
|
||||||
bref_set_state(bref, BREF_WAITING_RESULT);
|
bref_set_state(bref, BREF_WAITING_RESULT);
|
||||||
}
|
}
|
||||||
@ -1256,68 +1417,8 @@ static int routeQuery(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
rses_end_locked_router_action(router_cli_ses);
|
rses_end_locked_router_action(router_cli_ses);
|
||||||
|
|
||||||
ss_dassert(succp);
|
|
||||||
goto return_ret;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
bool succp = true;
|
|
||||||
|
|
||||||
if (LOG_IS_ENABLED(LOGFILE_TRACE))
|
|
||||||
{
|
|
||||||
if (router_cli_ses->rses_transaction_active) /*< all to master */
|
|
||||||
{
|
|
||||||
LOGIF(LT, (skygw_log_write(
|
|
||||||
LOGFILE_TRACE,
|
|
||||||
"Transaction is active, routing to Master.")));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOGIF(LT, (skygw_log_write(
|
|
||||||
LOGFILE_TRACE,
|
|
||||||
"Begin transaction, write or unspecified type, "
|
|
||||||
"routing to Master.")));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/** Lock router session */
|
|
||||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
|
||||||
{
|
|
||||||
goto return_ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (master_dcb == NULL)
|
|
||||||
{
|
|
||||||
succp = get_dcb(&master_dcb, router_cli_ses, BE_MASTER);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (succp)
|
|
||||||
{
|
|
||||||
if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1)
|
|
||||||
{
|
|
||||||
backend_ref_t* bref;
|
|
||||||
|
|
||||||
atomic_add(&inst->stats.n_master, 1);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add one write response waiter to backend reference
|
|
||||||
*/
|
|
||||||
bref = get_bref_from_dcb(router_cli_ses, master_dcb);
|
|
||||||
bref_set_state(bref, BREF_QUERY_ACTIVE);
|
|
||||||
bref_set_state(bref, BREF_WAITING_RESULT);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
rses_end_locked_router_action(router_cli_ses);
|
|
||||||
|
|
||||||
ss_dassert(succp);
|
|
||||||
|
|
||||||
if (ret == 0)
|
|
||||||
{
|
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
|
||||||
LOGFILE_ERROR,
|
|
||||||
"Error : Routing to master failed.")));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return_ret:
|
return_ret:
|
||||||
if (plainsqlbuf != NULL)
|
if (plainsqlbuf != NULL)
|
||||||
{
|
{
|
||||||
@ -1335,7 +1436,7 @@ return_ret:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/** to be inline'd */
|
|
||||||
/**
|
/**
|
||||||
* @node Acquires lock to router client session if it is not closed.
|
* @node Acquires lock to router client session if it is not closed.
|
||||||
*
|
*
|
||||||
@ -2073,7 +2174,8 @@ static bool select_connect_backend_servers(
|
|||||||
}
|
}
|
||||||
/* assert with master_host */
|
/* assert with master_host */
|
||||||
ss_dassert(!master_connected ||
|
ss_dassert(!master_connected ||
|
||||||
(master_host && ((*p_master_ref)->bref_backend->backend_server == master_host->backend_server) && SERVER_MASTER));
|
(master_host && ((*p_master_ref)->bref_backend->backend_server == master_host->backend_server) &&
|
||||||
|
SERVER_MASTER));
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -2667,7 +2769,7 @@ static bool execute_sescmd_in_backend(
|
|||||||
dcb->fd,
|
dcb->fd,
|
||||||
STRPACKETTYPE(cmd))));
|
STRPACKETTYPE(cmd))));
|
||||||
}
|
}
|
||||||
#endif
|
#endif /*< SS_DEBUG */
|
||||||
switch (scur->scmd_cur_cmd->my_sescmd_packet_type) {
|
switch (scur->scmd_cur_cmd->my_sescmd_packet_type) {
|
||||||
case MYSQL_COM_CHANGE_USER:
|
case MYSQL_COM_CHANGE_USER:
|
||||||
rc = dcb->func.auth(
|
rc = dcb->func.auth(
|
||||||
@ -2961,7 +3063,6 @@ static bool route_session_write(
|
|||||||
/** Lock router session */
|
/** Lock router session */
|
||||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
if (!rses_begin_locked_router_action(router_cli_ses))
|
||||||
{
|
{
|
||||||
rses_property_done(prop);
|
|
||||||
succp = false;
|
succp = false;
|
||||||
goto return_succp;
|
goto return_succp;
|
||||||
}
|
}
|
||||||
@ -3031,6 +3132,7 @@ return_succp:
|
|||||||
}
|
}
|
||||||
|
|
||||||
#if defined(NOT_USED)
|
#if defined(NOT_USED)
|
||||||
|
|
||||||
static bool router_option_configured(
|
static bool router_option_configured(
|
||||||
ROUTER_INSTANCE* router,
|
ROUTER_INSTANCE* router,
|
||||||
const char* optionstr,
|
const char* optionstr,
|
||||||
@ -3065,7 +3167,7 @@ static bool router_option_configured(
|
|||||||
}
|
}
|
||||||
return succp;
|
return succp;
|
||||||
}
|
}
|
||||||
#endif
|
#endif /*< NOT_USED */
|
||||||
|
|
||||||
static void rwsplit_process_router_options(
|
static void rwsplit_process_router_options(
|
||||||
ROUTER_INSTANCE* router,
|
ROUTER_INSTANCE* router,
|
||||||
@ -3346,7 +3448,7 @@ static void print_error_packet(
|
|||||||
{
|
{
|
||||||
while ((buf = gwbuf_consume(buf, GWBUF_LENGTH(buf))) != NULL);
|
while ((buf = gwbuf_consume(buf, GWBUF_LENGTH(buf))) != NULL);
|
||||||
}
|
}
|
||||||
#endif
|
#endif /*< SS_DEBUG */
|
||||||
}
|
}
|
||||||
|
|
||||||
static int router_get_servercount(
|
static int router_get_servercount(
|
||||||
@ -3568,10 +3670,10 @@ static prep_stmt_t* prep_stmt_init(
|
|||||||
|
|
||||||
if (pstmt != NULL)
|
if (pstmt != NULL)
|
||||||
{
|
{
|
||||||
#if defined(SS_DEBUG)
|
#if defined(SS_DEBUG)
|
||||||
pstmt->pstmt_chk_top = CHK_NUM_PREP_STMT;
|
pstmt->pstmt_chk_top = CHK_NUM_PREP_STMT;
|
||||||
pstmt->pstmt_chk_tail = CHK_NUM_PREP_STMT;
|
pstmt->pstmt_chk_tail = CHK_NUM_PREP_STMT;
|
||||||
#endif
|
#endif
|
||||||
pstmt->pstmt_state = PREP_STMT_ALLOC;
|
pstmt->pstmt_state = PREP_STMT_ALLOC;
|
||||||
pstmt->pstmt_type = type;
|
pstmt->pstmt_type = type;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user