Merge remote-tracking branch 'origin/Z3' into MAX-237

Conflicts:
	query_classifier/query_classifier.cc
	query_classifier/query_classifier.h
	server/modules/routing/readwritesplit/readwritesplit.c
This commit is contained in:
Markus Makela
2014-09-01 11:05:10 +03:00
46 changed files with 3473 additions and 331 deletions

View File

@ -98,6 +98,12 @@ static int rses_get_max_slavecount(ROUTER_CLIENT_SES* rses, int router_nservers
static int rses_get_max_replication_lag(ROUTER_CLIENT_SES* rses);
static backend_ref_t* get_bref_from_dcb(ROUTER_CLIENT_SES* rses, DCB* dcb);
static route_target_t get_route_target (
skygw_query_type_t qtype,
bool trx_active,
HINT* hint);
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
#if defined(NOT_USED)
@ -154,7 +160,9 @@ static bool select_connect_backend_servers(
static bool get_dcb(
DCB** dcb,
ROUTER_CLIENT_SES* rses,
backend_type_t btype);
backend_type_t btype,
char* name,
int max_rlag);
static void rwsplit_process_router_options(
ROUTER_INSTANCE* router,
@ -951,19 +959,29 @@ static void freeSession(
}
/**
* Provide a pointer to a suitable backend dcb.
* Provide the router with a pointer to a suitable backend dcb.
* Detect failures in server statuses and reselect backends if necessary.
* If name is specified, server name becomes primary selection criteria.
*
* @param p_dcb Address of the pointer to the resulting DCB
* @param rses Pointer to router client session
* @param btype Backend type
* @param name Name of the backend which is primarily searched. May be NULL.
*
* @return True if proper DCB was found, false otherwise.
*/
static bool get_dcb(
DCB** p_dcb,
ROUTER_CLIENT_SES* rses,
backend_type_t btype)
backend_type_t btype,
char* name,
int max_rlag)
{
backend_ref_t* backend_ref;
int smallest_nconn = -1;
int i;
bool succp = false;
BACKEND *master_host = NULL;
BACKEND* master_host;
CHK_CLIENT_RSES(rses);
ss_dassert(p_dcb != NULL && *(p_dcb) == NULL);
@ -974,55 +992,95 @@ static bool get_dcb(
}
backend_ref = rses->rses_backend_ref;
/* get root master from availbal servers */
/** get root master from available servers */
master_host = get_root_master(backend_ref, rses->rses_nbackends);
if (btype == BE_SLAVE)
{
for (i=0; i<rses->rses_nbackends; i++)
if (name != NULL) /*< Choose backend by name (hint) */
{
BACKEND* b = backend_ref[i].bref_backend;
/* check slave bit, also for relay servers (Master & Servers) */
if (BREF_IS_IN_USE((&backend_ref[i])) &&
(SERVER_IS_SLAVE(b->backend_server) || SERVER_IS_RELAY_SERVER(b->backend_server)) &&
(master_host != NULL && b->backend_server != master_host->backend_server) &&
(smallest_nconn == -1 ||
b->backend_conn_count < smallest_nconn))
for (i=0; i<rses->rses_nbackends; i++)
{
*p_dcb = backend_ref[i].bref_dcb;
smallest_nconn = b->backend_conn_count;
succp = true;
ss_dassert(backend_ref[i].bref_dcb->state != DCB_STATE_ZOMBIE);
BACKEND* b = backend_ref[i].bref_backend;
/**
* To become chosen:
* backend must be in use, name must match,
* root master node must be found,
* backend's role must be either slave, relay
* server, or master.
*/
if (BREF_IS_IN_USE((&backend_ref[i])) &&
(strncasecmp(
name,
b->backend_server->unique_name,
MIN(strlen(b->backend_server->unique_name), PATH_MAX)) == 0) &&
master_host != NULL &&
#if 0
(max_rlag == MAX_RLAG_UNDEFINED ||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
b->backend_server->rlag <= max_rlag)) &&
#endif
(SERVER_IS_SLAVE(b->backend_server) ||
SERVER_IS_RELAY_SERVER(b->backend_server) ||
SERVER_IS_MASTER(b->backend_server)))
{
*p_dcb = backend_ref[i].bref_dcb;
succp = true;
ss_dassert(backend_ref[i].bref_dcb->state != DCB_STATE_ZOMBIE);
break;
}
}
}
if (!succp)
if (!succp) /*< No hints or finding named backend failed */
{
backend_ref = rses->rses_master_ref;
if (BREF_IS_IN_USE(backend_ref))
for (i=0; i<rses->rses_nbackends; i++)
{
*p_dcb = backend_ref->bref_dcb;
succp = true;
ss_dassert(backend_ref->bref_dcb->state != DCB_STATE_ZOMBIE);
ss_dassert(
(master_host && (backend_ref->bref_backend->backend_server == master_host->backend_server)) &&
smallest_nconn == -1);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Warning : No slaves connected nor "
"available. Choosing master %s:%d "
"instead.",
backend_ref->bref_backend->backend_server->name,
backend_ref->bref_backend->backend_server->port)));
BACKEND* b = backend_ref[i].bref_backend;
/**
* To become chosen:
* backend must be in use,
* root master node must be found,
* backend is not allowed to be the master,
* backend's role can be either slave or relay
* server and it must have least connections
* at the moment.
*/
if (BREF_IS_IN_USE((&backend_ref[i])) &&
master_host != NULL &&
b->backend_server != master_host->backend_server &&
(max_rlag == MAX_RLAG_UNDEFINED ||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
b->backend_server->rlag <= max_rlag)) &&
(SERVER_IS_SLAVE(b->backend_server) ||
SERVER_IS_RELAY_SERVER(b->backend_server)) &&
(smallest_nconn == -1 ||
b->backend_conn_count < smallest_nconn))
{
*p_dcb = backend_ref[i].bref_dcb;
smallest_nconn = b->backend_conn_count;
succp = true;
ss_dassert(backend_ref[i].bref_dcb->state != DCB_STATE_ZOMBIE);
}
}
}
ss_dassert(succp);
}
if (!succp) /*< No valid slave was found, search master next */
{
btype = BE_MASTER;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Warning : No slaves connected nor "
"available. Choosing master %s:%d "
"instead.",
backend_ref->bref_backend->backend_server->name,
backend_ref->bref_backend->backend_server->port)));
}
}
else if (btype == BE_MASTER)
if (btype == BE_MASTER)
{
for (i=0; i<rses->rses_nbackends; i++)
{
@ -1037,10 +1095,107 @@ static bool get_dcb(
}
}
}
return_succp:
return succp;
}
/**
* Examine the query type, transaction state and routing hints. Find out the
* target for query routing.
*
* @param qtype Type of query
* @param trx_active Is transacation active or not
* @param hint Pointer to list of hints attached to the query buffer
*
* @return bitfield including the routing target, or the target server name
* if the query would otherwise be routed to slave.
*/
static route_target_t get_route_target (
skygw_query_type_t qtype,
bool trx_active,
HINT* hint)
{
route_target_t target;
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_STMT) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_NAMED_STMT))
{
/** hints don't affect on routing */
target = TARGET_ALL;
}
else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) && !trx_active)
{
target = TARGET_SLAVE;
/** process routing hints */
while (hint != NULL)
{
if (hint->type == HINT_ROUTE_TO_MASTER)
{
target = TARGET_MASTER; /*< override */
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Hint: route to master.")));
break;
}
else if (hint->type == HINT_ROUTE_TO_NAMED_SERVER)
{
target |= TARGET_NAMED_SERVER; /*< add */
}
else if (hint->type == HINT_ROUTE_TO_UPTODATE_SERVER)
{
/** not implemented */
}
else if (hint->type == HINT_ROUTE_TO_ALL)
{
/** not implemented */
}
else if (hint->type == HINT_PARAMETER)
{
if (strncasecmp(
(char *)hint->data,
"max_slave_replication_lag",
strlen("max_slave_replication_lag")) == 0)
{
target |= TARGET_RLAG_MAX;
}
else
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Error : Unknown hint parameter "
"'%s' when 'max_slave_replication_lag' "
"was expected.",
(char *)hint->data)));
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unknown hint parameter "
"'%s' when 'max_slave_replication_lag' "
"was expected.",
(char *)hint->data)));
}
}
else if (hint->type == HINT_ROUTE_TO_SLAVE)
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Hint: route to slave.")));
}
hint = hint->next;
} /*< while (hint != NULL) */
}
else
{
/** hints don't affect on routing */
target = TARGET_MASTER;
}
return target;
}
/**
* The main routing entry, this is called with every packet that is
* received and has to be forwarded to the backend database.
@ -1070,6 +1225,9 @@ static int routeQuery(
GWBUF* querybuf)
{
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
GWBUF* plainsqlbuf = NULL;
char* querystr = NULL;
char* startpos;
mysql_server_cmd_t packet_type;
uint8_t* packet;
int ret = 0,
@ -1077,7 +1235,7 @@ static int routeQuery(
klen = 0,
i = 0;
DCB* master_dcb = NULL;
DCB* slave_dcb = NULL;
DCB* target_dcb = NULL;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
rses_property_t* rses_prop_tmp;
@ -1087,6 +1245,10 @@ static int routeQuery(
char** tbl;
HASHTABLE* h;
MYSQL_session* data;
size_t len;
MYSQL* mysql = NULL;
route_target_t route_target;
CHK_CLIENT_RSES(router_cli_ses);
@ -1256,12 +1418,20 @@ static int routeQuery(
router_cli_ses->rses_autocommit_enabled = true;
router_cli_ses->rses_transaction_active = false;
}
/**
* Session update is always routed in the same way.
/**
* Find out where to route the query. Result may not be clear; it is
* possible to have a hint for routing to a named server which can
* be either slave or master.
* If query would otherwise be routed to slave then the hint determines
* actual target server if it exists.
*
* route_target is a bitfield and may include multiple values.
*/
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_STMT) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_NAMED_STMT))
route_target = get_route_target(qtype,
router_cli_ses->rses_transaction_active,
querybuf->hint);
if (TARGET_IS_ALL(route_target))
{
/**
* It is not sure if the session command in question requires
@ -1280,81 +1450,102 @@ static int routeQuery(
}
goto return_ret;
}
else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) &&
!router_cli_ses->rses_transaction_active)
{
bool succp;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"[%s]\tRead-only query, routing to Slave.",
inst->service->name)));
ss_dassert(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ));
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
goto return_ret;
}
succp = get_dcb(&slave_dcb, router_cli_ses, BE_SLAVE);
if (succp)
{
if ((ret = slave_dcb->func.write(slave_dcb, gwbuf_clone(querybuf))) == 1)
{
backend_ref_t* bref;
atomic_add(&inst->stats.n_slave, 1);
/**
* Add one query response waiter to backend reference
*/
bref = get_bref_from_dcb(router_cli_ses, slave_dcb);
bref_set_state(bref, BREF_QUERY_ACTIVE);
bref_set_state(bref, BREF_WAITING_RESULT);
}
else
{
char* query_str = modutil_get_query(querybuf);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing query \"%s\" failed.",
(query_str == NULL ? "not available" : query_str))));
free(query_str);
}
}
rses_end_locked_router_action(router_cli_ses);
ss_dassert(succp);
goto return_ret;
}
/**
* qtype is QUERY_TYPE_WRITE or QUERY_TYPE_READ_TMP_TABLE
*/
* Handle routing to master and to slave
*/
else
{
bool succp = true;
if (LOG_IS_ENABLED(LOGFILE_TRACE))
bool succp = true;
HINT* hint;
char* named_server = NULL;
int rlag_max = MAX_RLAG_UNDEFINED;
if (router_cli_ses->rses_transaction_active) /*< all to master */
{
if (router_cli_ses->rses_transaction_active) /*< all to master */
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Transaction is active, routing to Master.")));
}
else
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Begin transaction, write or unspecified type, "
"routing to Master.")));
}
route_target = TARGET_MASTER; /*< override old value */
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Transaction is active, routing to Master.")));
}
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "%s", STRQTYPE(qtype))));
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
goto return_ret;
}
}
if (TARGET_IS_SLAVE(route_target))
{
if (TARGET_IS_NAMED_SERVER(route_target) ||
TARGET_IS_RLAG_MAX(route_target))
{
hint = querybuf->hint;
while (hint != NULL)
{
if (hint->type == HINT_ROUTE_TO_NAMED_SERVER)
{
named_server = hint->data;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Hint: route to server "
"'%s'",
named_server)));
}
else if (hint->type == HINT_PARAMETER &&
(strncasecmp(
(char *)hint->data,
"max_slave_replication_lag",
strlen("max_slave_replication_lag")) == 0))
{
int val = (int) strtol((char *)hint->value,
(char **)NULL, 10);
if (val != 0 || errno == 0)
{
rlag_max = val;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Hint: "
"max_slave_replication_lag=%d",
rlag_max)));
}
}
hint = hint->next;
}
}
if (rlag_max == MAX_RLAG_UNDEFINED) /*< no rlag max hint, use config */
{
rlag_max = rses_get_max_replication_lag(router_cli_ses);
}
succp = get_dcb(&target_dcb,
router_cli_ses,
BE_SLAVE,
named_server,
rlag_max);
}
else if (TARGET_IS_MASTER(route_target))
{
if (master_dcb == NULL)
{
succp = get_dcb(&master_dcb,
router_cli_ses,
BE_MASTER,
NULL,
MAX_RLAG_UNDEFINED);
}
target_dcb = master_dcb;
}
/** Lock router session */
/*if (!rses_begin_locked_router_action(router_cli_ses))
{
goto return_ret;
} */
/**
@ -1462,35 +1653,32 @@ static int routeQuery(
if (master_dcb == NULL)
{
succp = get_dcb(&master_dcb, router_cli_ses, BE_MASTER);
succp = get_dcb(&master_dcb, router_cli_ses, BE_MASTER, NULL, MAX_RLAG_UNDEFINED);
}
if (succp)
{
if ((ret = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf))) == 1)
if (succp) /*< Have DCB of the target backend */
{
if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1)
{
backend_ref_t* bref;
atomic_add(&inst->stats.n_master, 1);
atomic_add(&inst->stats.n_slave, 1);
/**
* Add one write response waiter to backend reference
* Add one query response waiter to backend reference
*/
bref = get_bref_from_dcb(router_cli_ses, master_dcb);
bref = get_bref_from_dcb(router_cli_ses, target_dcb);
bref_set_state(bref, BREF_QUERY_ACTIVE);
bref_set_state(bref, BREF_WAITING_RESULT);
bref_set_state(bref, BREF_WAITING_RESULT);
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing query \"%s\" failed.",
querystr)));
}
}
rses_end_locked_router_action(router_cli_ses);
ss_dassert(succp);
if (ret == 0)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing to master failed.")));
}
}
return_ret:
#if defined(SS_DEBUG)
@ -1514,7 +1702,7 @@ return_ret:
}
/** to be inline'd */
/**
* @node Acquires lock to router client session if it is not closed.
*
@ -1719,7 +1907,6 @@ static void clientReply (
if (LOG_IS_ENABLED(LOGFILE_ERROR) &&
MYSQL_IS_ERROR_PACKET(((uint8_t *)GWBUF_DATA(writebuf))))
{
SESSION* ses = backend_dcb->session;
uint8_t* buf =
(uint8_t *)GWBUF_DATA((scur->scmd_cur_cmd->my_sescmd_buf));
size_t len = MYSQL_GET_PACKET_LEN(buf);
@ -2044,7 +2231,9 @@ static bool select_connect_backend_servers(
#endif
/* assert with master_host */
ss_dassert(!master_connected ||
(master_host && ((*p_master_ref)->bref_backend->backend_server == master_host->backend_server) && SERVER_MASTER));
(master_host &&
((*p_master_ref)->bref_backend->backend_server == master_host->backend_server) &&
SERVER_MASTER));
/**
* Sort the pointer list to servers according to connection counts. As
* a consequence those backends having least connections are in the
@ -2122,8 +2311,8 @@ static bool select_connect_backend_servers(
{
/* check also for relay servers and don't take the master_host */
if (slaves_found < max_nslaves &&
(max_slave_rlag == -2 ||
(b->backend_server->rlag != -1 && /*< information currently not available */
(max_slave_rlag == MAX_RLAG_UNDEFINED ||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
b->backend_server->rlag <= max_slave_rlag)) &&
(SERVER_IS_SLAVE(b->backend_server) || SERVER_IS_RELAY_SERVER(b->backend_server)) &&
(master_host != NULL && (b->backend_server != master_host->backend_server)))
@ -2253,7 +2442,8 @@ static bool select_connect_backend_servers(
}
/* assert with master_host */
ss_dassert(!master_connected ||
(master_host && ((*p_master_ref)->bref_backend->backend_server == master_host->backend_server) && SERVER_MASTER));
(master_host && ((*p_master_ref)->bref_backend->backend_server == master_host->backend_server) &&
SERVER_MASTER));
#endif
/**
@ -2315,15 +2505,11 @@ static bool select_connect_backend_servers(
BACKEND* b = backend_ref[i].bref_backend;
if (BREF_IS_IN_USE((&backend_ref[i])))
{
backend_type_t btype = BACKEND_TYPE(b);
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Selected %s in \t%s:%d",
(btype == BE_MASTER ? "master" :
(btype == BE_SLAVE ? "slave" :
"unknown node type")),
STRSRVSTATUS(b->backend_server),
b->backend_server->name,
b->backend_server->port)));
}
@ -3164,7 +3350,6 @@ static bool route_session_write(
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
rses_property_done(prop);
succp = false;
goto return_succp;
}
@ -3234,6 +3419,7 @@ return_succp:
}
#if defined(NOT_USED)
static bool router_option_configured(
ROUTER_INSTANCE* router,
const char* optionstr,
@ -3410,6 +3596,11 @@ static bool handle_error_reply_client(
CHK_DCB(client_dcb);
client_dcb->func.write(client_dcb, errmsg);
}
else
{
while ((errmsg=gwbuf_consume(errmsg, GWBUF_LENGTH(errmsg))) != NULL)
;
}
succp = false; /** false because new servers aren's selected. */
return succp;
@ -3464,6 +3655,11 @@ static bool handle_error_new_connection(
client_dcb->func.write(client_dcb, errmsg);
bref_clear_state(bref, BREF_WAITING_RESULT);
}
else
{
while ((errmsg=gwbuf_consume(errmsg, GWBUF_LENGTH(errmsg))) != NULL)
;
}
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED);
/**