
query_classifier.cc: Now query can be parsed outside query_classifier_get_type by calling function parse_query. It creates parsing_info_t struct which is then added to the GWBUF which also includes the query. Parsing information follows the buffered query and it is freed at the same time with query buffer, in gwbuf_free. buffer.c: additions of parsing information to gwbuf struct. modutil.c: added function which returns query from GWBUF in plain text string. readwritesplit.c:routeQuery now only calls query_classifier_get_type to get the query type instead of extracting plain text query from the GWBUF buffer.
3627 lines
129 KiB
C
3627 lines
129 KiB
C
/*
|
|
* This file is distributed as part of the SkySQL Gateway. It is free
|
|
* software: you can redistribute it and/or modify it under the terms of the
|
|
* GNU General Public License as published by the Free Software Foundation,
|
|
* version 2.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
* details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License along with
|
|
* this program; if not, write to the Free Software Foundation, Inc., 51
|
|
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
*
|
|
* Copyright SkySQL Ab 2013
|
|
*/
|
|
#include <stdio.h>
|
|
#include <strings.h>
|
|
#include <string.h>
|
|
#include <stdlib.h>
|
|
#include <stdint.h>
|
|
|
|
#include <router.h>
|
|
#include <readwritesplit.h>
|
|
|
|
#include <mysql.h>
|
|
#include <skygw_utils.h>
|
|
#include <log_manager.h>
|
|
#include <query_classifier.h>
|
|
#include <dcb.h>
|
|
#include <spinlock.h>
|
|
#include <modinfo.h>
|
|
#include <modutil.h>
|
|
#include <mysql_client_server_protocol.h>
|
|
|
|
MODULE_INFO info = {
|
|
MODULE_API_ROUTER,
|
|
MODULE_BETA_RELEASE,
|
|
ROUTER_VERSION,
|
|
"A Read/Write splitting router for enhancement read scalability"
|
|
};
|
|
#if defined(SS_DEBUG)
|
|
# include <mysql_client_server_protocol.h>
|
|
#endif
|
|
|
|
|
|
extern int lm_enabled_logfiles_bitmask;
|
|
|
|
/**
|
|
* @file readwritesplit.c The entry points for the read/write query splitting
|
|
* router module.
|
|
*
|
|
* This file contains the entry points that comprise the API to the read write
|
|
* query splitting router.
|
|
* @verbatim
|
|
* Revision History
|
|
*
|
|
* Date Who Description
|
|
* 01/07/2013 Vilho Raatikka Initial implementation
|
|
* 15/07/2013 Massimiliano Pinto Added clientReply
|
|
* from master only in case of session change
|
|
* 17/07/2013 Massimiliano Pinto clientReply is now used by mysql_backend
|
|
* for all reply situations
|
|
* 18/07/2013 Massimiliano Pinto routeQuery now handles COM_QUIT
|
|
* as QUERY_TYPE_SESSION_WRITE
|
|
* 17/07/2014 Massimiliano Pinto Server connection counter is updated in closeSession
|
|
*
|
|
* @endverbatim
|
|
*/
|
|
|
|
static char *version_str = "V1.0.2";
|
|
|
|
static ROUTER* createInstance(SERVICE *service, char **options);
|
|
static void* newSession(ROUTER *instance, SESSION *session);
|
|
static void closeSession(ROUTER *instance, void *session);
|
|
static void freeSession(ROUTER *instance, void *session);
|
|
static int routeQuery(ROUTER *instance, void *session, GWBUF *queue);
|
|
static void diagnostic(ROUTER *instance, DCB *dcb);
|
|
|
|
static void clientReply(
|
|
ROUTER* instance,
|
|
void* router_session,
|
|
GWBUF* queue,
|
|
DCB* backend_dcb);
|
|
|
|
static void handleError(
|
|
ROUTER* instance,
|
|
void* router_session,
|
|
GWBUF* errmsgbuf,
|
|
DCB* backend_dcb,
|
|
error_action_t action,
|
|
bool* succp);
|
|
|
|
static void print_error_packet(ROUTER_CLIENT_SES* rses, GWBUF* buf, DCB* dcb);
|
|
static int router_get_servercount(ROUTER_INSTANCE* router);
|
|
static int rses_get_max_slavecount(ROUTER_CLIENT_SES* rses, int router_nservers);
|
|
static int rses_get_max_replication_lag(ROUTER_CLIENT_SES* rses);
|
|
static backend_ref_t* get_bref_from_dcb(ROUTER_CLIENT_SES* rses, DCB* dcb);
|
|
|
|
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
|
|
|
|
#if defined(NOT_USED)
|
|
static bool router_option_configured(
|
|
ROUTER_INSTANCE* router,
|
|
const char* optionstr,
|
|
void* data);
|
|
#endif
|
|
|
|
#if defined(PREP_STMT_CACHING)
|
|
static prep_stmt_t* prep_stmt_init(prep_stmt_type_t type, void* id);
|
|
static void prep_stmt_done(prep_stmt_t* pstmt);
|
|
#endif /*< PREP_STMT_CACHING */
|
|
|
|
int bref_cmp_global_conn(
|
|
const void* bref1,
|
|
const void* bref2);
|
|
|
|
int bref_cmp_router_conn(
|
|
const void* bref1,
|
|
const void* bref2);
|
|
|
|
int bref_cmp_behind_master(
|
|
const void* bref1,
|
|
const void* bref2);
|
|
|
|
int bref_cmp_current_load(
|
|
const void* bref1,
|
|
const void* bref2);
|
|
|
|
/**
|
|
* The order of functions _must_ match with the order the select criteria are
|
|
* listed in select_criteria_t definition in readwritesplit.h
|
|
*/
|
|
int (*criteria_cmpfun[LAST_CRITERIA])(const void*, const void*)=
|
|
{
|
|
NULL,
|
|
bref_cmp_global_conn,
|
|
bref_cmp_router_conn,
|
|
bref_cmp_behind_master,
|
|
bref_cmp_current_load
|
|
};
|
|
|
|
static bool select_connect_backend_servers(
|
|
backend_ref_t** p_master_ref,
|
|
backend_ref_t* backend_ref,
|
|
int router_nservers,
|
|
int max_nslaves,
|
|
int max_rlag,
|
|
select_criteria_t select_criteria,
|
|
SESSION* session,
|
|
ROUTER_INSTANCE* router);
|
|
|
|
static bool get_dcb(
|
|
DCB** dcb,
|
|
ROUTER_CLIENT_SES* rses,
|
|
backend_type_t btype);
|
|
|
|
static void rwsplit_process_router_options(
|
|
ROUTER_INSTANCE* router,
|
|
char** options);
|
|
|
|
|
|
|
|
static ROUTER_OBJECT MyObject = {
|
|
createInstance,
|
|
newSession,
|
|
closeSession,
|
|
freeSession,
|
|
routeQuery,
|
|
diagnostic,
|
|
clientReply,
|
|
handleError,
|
|
getCapabilities
|
|
};
|
|
static bool rses_begin_locked_router_action(
|
|
ROUTER_CLIENT_SES* rses);
|
|
|
|
static void rses_end_locked_router_action(
|
|
ROUTER_CLIENT_SES* rses);
|
|
|
|
static void mysql_sescmd_done(
|
|
mysql_sescmd_t* sescmd);
|
|
|
|
static mysql_sescmd_t* mysql_sescmd_init (
|
|
rses_property_t* rses_prop,
|
|
GWBUF* sescmd_buf,
|
|
unsigned char packet_type,
|
|
ROUTER_CLIENT_SES* rses);
|
|
|
|
static rses_property_t* mysql_sescmd_get_property(
|
|
mysql_sescmd_t* scmd);
|
|
|
|
static rses_property_t* rses_property_init(
|
|
rses_property_type_t prop_type);
|
|
|
|
static void rses_property_add(
|
|
ROUTER_CLIENT_SES* rses,
|
|
rses_property_t* prop);
|
|
|
|
static void rses_property_done(
|
|
rses_property_t* prop);
|
|
|
|
static mysql_sescmd_t* rses_property_get_sescmd(
|
|
rses_property_t* prop);
|
|
|
|
static bool execute_sescmd_history(backend_ref_t* bref);
|
|
|
|
static bool execute_sescmd_in_backend(
|
|
backend_ref_t* backend_ref);
|
|
|
|
static void sescmd_cursor_reset(sescmd_cursor_t* scur);
|
|
|
|
static bool sescmd_cursor_history_empty(sescmd_cursor_t* scur);
|
|
|
|
static void sescmd_cursor_set_active(
|
|
sescmd_cursor_t* sescmd_cursor,
|
|
bool value);
|
|
|
|
static bool sescmd_cursor_is_active(
|
|
sescmd_cursor_t* sescmd_cursor);
|
|
|
|
static GWBUF* sescmd_cursor_clone_querybuf(
|
|
sescmd_cursor_t* scur);
|
|
|
|
static mysql_sescmd_t* sescmd_cursor_get_command(
|
|
sescmd_cursor_t* scur);
|
|
|
|
static bool sescmd_cursor_next(
|
|
sescmd_cursor_t* scur);
|
|
|
|
static GWBUF* sescmd_cursor_process_replies(GWBUF* replybuf, backend_ref_t* bref);
|
|
|
|
static void tracelog_routed_query(
|
|
ROUTER_CLIENT_SES* rses,
|
|
char* funcname,
|
|
backend_ref_t* bref,
|
|
GWBUF* buf);
|
|
|
|
static bool route_session_write(
|
|
ROUTER_CLIENT_SES* router_client_ses,
|
|
GWBUF* querybuf,
|
|
ROUTER_INSTANCE* inst,
|
|
unsigned char packet_type,
|
|
skygw_query_type_t qtype);
|
|
|
|
static void refreshInstance(
|
|
ROUTER_INSTANCE* router,
|
|
CONFIG_PARAMETER* param);
|
|
|
|
static void bref_clear_state(backend_ref_t* bref, bref_state_t state);
|
|
static void bref_set_state(backend_ref_t* bref, bref_state_t state);
|
|
static sescmd_cursor_t* backend_ref_get_sescmd_cursor (backend_ref_t* bref);
|
|
|
|
static int router_handle_state_switch(DCB* dcb, DCB_REASON reason, void* data);
|
|
static bool handle_error_new_connection(
|
|
ROUTER_INSTANCE* inst,
|
|
ROUTER_CLIENT_SES* rses,
|
|
DCB* backend_dcb,
|
|
GWBUF* errmsg);
|
|
static bool handle_error_reply_client(SESSION* ses, GWBUF* errmsg);
|
|
|
|
static BACKEND* get_root_master(
|
|
backend_ref_t* servers,
|
|
int router_nservers);
|
|
|
|
static bool have_enough_servers(
|
|
ROUTER_CLIENT_SES** rses,
|
|
const int nsrv,
|
|
int router_nsrv,
|
|
ROUTER_INSTANCE* router);
|
|
|
|
static SPINLOCK instlock;
|
|
static ROUTER_INSTANCE* instances;
|
|
|
|
/**
|
|
* Implementation of the mandatory version entry point
|
|
*
|
|
* @return version string of the module
|
|
*/
|
|
char *
|
|
version()
|
|
{
|
|
return version_str;
|
|
}
|
|
|
|
/**
|
|
* The module initialisation routine, called when the module
|
|
* is first loaded.
|
|
*/
|
|
void
|
|
ModuleInit()
|
|
{
|
|
LOGIF(LM, (skygw_log_write_flush(
|
|
LOGFILE_MESSAGE,
|
|
"Initializing statemend-based read/write split router module.")));
|
|
spinlock_init(&instlock);
|
|
instances = NULL;
|
|
}
|
|
|
|
/**
|
|
* The module entry point routine. It is this routine that
|
|
* must populate the structure that is referred to as the
|
|
* "module object", this is a structure with the set of
|
|
* external entry points for this module.
|
|
*
|
|
* @return The module object
|
|
*/
|
|
ROUTER_OBJECT* GetModuleObject()
|
|
{
|
|
return &MyObject;
|
|
}
|
|
|
|
static void refreshInstance(
|
|
ROUTER_INSTANCE* router,
|
|
CONFIG_PARAMETER* singleparam)
|
|
{
|
|
CONFIG_PARAMETER* param;
|
|
bool refresh_single;
|
|
|
|
if (singleparam != NULL)
|
|
{
|
|
param = singleparam;
|
|
refresh_single = true;
|
|
}
|
|
else
|
|
{
|
|
param = router->service->svc_config_param;
|
|
refresh_single = false;
|
|
}
|
|
|
|
while (param != NULL)
|
|
{
|
|
config_param_type_t paramtype;
|
|
|
|
paramtype = config_get_paramtype(param);
|
|
|
|
if (paramtype == COUNT_TYPE)
|
|
{
|
|
if (strncmp(param->name, "max_slave_connections", MAX_PARAM_LEN) == 0)
|
|
{
|
|
router->rwsplit_config.rw_max_slave_conn_percent = 0;
|
|
router->rwsplit_config.rw_max_slave_conn_count =
|
|
config_get_valint(param, NULL, paramtype);
|
|
}
|
|
else if (strncmp(param->name,
|
|
"max_slave_replication_lag",
|
|
MAX_PARAM_LEN) == 0)
|
|
{
|
|
router->rwsplit_config.rw_max_slave_replication_lag =
|
|
config_get_valint(param, NULL, paramtype);
|
|
}
|
|
}
|
|
else if (paramtype == PERCENT_TYPE)
|
|
{
|
|
if (strncmp(param->name, "max_slave_connections", MAX_PARAM_LEN) == 0)
|
|
{
|
|
router->rwsplit_config.rw_max_slave_conn_count = 0;
|
|
router->rwsplit_config.rw_max_slave_conn_percent =
|
|
config_get_valint(param, NULL, paramtype);
|
|
}
|
|
}
|
|
|
|
if (refresh_single)
|
|
{
|
|
break;
|
|
}
|
|
param = param->next;
|
|
}
|
|
|
|
#if defined(NOT_USED) /*< can't read monitor config parameters */
|
|
if ((*router->servers)->backend_server->rlag == -2)
|
|
{
|
|
rlag_enabled = false;
|
|
}
|
|
else
|
|
{
|
|
rlag_enabled = true;
|
|
}
|
|
/**
|
|
* If replication lag detection is not enabled the measure can't be
|
|
* used in slave selection.
|
|
*/
|
|
if (!rlag_enabled)
|
|
{
|
|
if (rlag_limited)
|
|
{
|
|
LOGIF(LE, (skygw_log_write_flush(
|
|
LOGFILE_ERROR,
|
|
"Warning : Configuration Failed, max_slave_replication_lag "
|
|
"is set to %d,\n\t\t but detect_replication_lag "
|
|
"is not enabled. Replication lag will not be checked.",
|
|
router->rwsplit_config.rw_max_slave_replication_lag)));
|
|
}
|
|
|
|
if (router->rwsplit_config.rw_slave_select_criteria ==
|
|
LEAST_BEHIND_MASTER)
|
|
{
|
|
LOGIF(LE, (skygw_log_write_flush(
|
|
LOGFILE_ERROR,
|
|
"Warning : Configuration Failed, router option "
|
|
"\n\t\t slave_selection_criteria=LEAST_BEHIND_MASTER "
|
|
"is specified, but detect_replication_lag "
|
|
"is not enabled.\n\t\t "
|
|
"slave_selection_criteria=%s will be used instead.",
|
|
STRCRITERIA(DEFAULT_CRITERIA))));
|
|
|
|
router->rwsplit_config.rw_slave_select_criteria =
|
|
DEFAULT_CRITERIA;
|
|
}
|
|
}
|
|
#endif /*< NOT_USED */
|
|
}
|
|
|
|
/**
|
|
* Create an instance of read/write statement router within the MaxScale.
|
|
*
|
|
*
|
|
* @param service The service this router is being create for
|
|
* @param options The options for this query router
|
|
*
|
|
* @return NULL in failure, pointer to router in success.
|
|
*/
|
|
static ROUTER *
|
|
createInstance(SERVICE *service, char **options)
|
|
{
|
|
ROUTER_INSTANCE* router;
|
|
SERVER* server;
|
|
int nservers;
|
|
int i;
|
|
CONFIG_PARAMETER* param;
|
|
char *weightby;
|
|
|
|
if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
|
|
return NULL;
|
|
}
|
|
router->service = service;
|
|
spinlock_init(&router->lock);
|
|
|
|
/** Calculate number of servers */
|
|
server = service->databases;
|
|
nservers = 0;
|
|
|
|
while (server != NULL)
|
|
{
|
|
nservers++;
|
|
server=server->nextdb;
|
|
}
|
|
router->servers = (BACKEND **)calloc(nservers + 1, sizeof(BACKEND *));
|
|
|
|
if (router->servers == NULL)
|
|
{
|
|
free(router);
|
|
return NULL;
|
|
}
|
|
/**
|
|
* Create an array of the backend servers in the router structure to
|
|
* maintain a count of the number of connections to each
|
|
* backend server.
|
|
*/
|
|
server = service->databases;
|
|
nservers= 0;
|
|
|
|
while (server != NULL) {
|
|
if ((router->servers[nservers] = malloc(sizeof(BACKEND))) == NULL)
|
|
{
|
|
/** clean up */
|
|
for (i = 0; i < nservers; i++) {
|
|
free(router->servers[i]);
|
|
}
|
|
free(router->servers);
|
|
free(router);
|
|
return NULL;
|
|
}
|
|
router->servers[nservers]->backend_server = server;
|
|
router->servers[nservers]->backend_conn_count = 0;
|
|
router->servers[nservers]->be_valid = false;
|
|
router->servers[nservers]->weight = 1000;
|
|
#if defined(SS_DEBUG)
|
|
router->servers[nservers]->be_chk_top = CHK_NUM_BACKEND;
|
|
router->servers[nservers]->be_chk_tail = CHK_NUM_BACKEND;
|
|
#endif
|
|
nservers += 1;
|
|
server = server->nextdb;
|
|
}
|
|
router->servers[nservers] = NULL;
|
|
|
|
/*
|
|
* If server weighting has been defined calculate the percentage
|
|
* of load that will be sent to each server. This is only used for
|
|
* calculating the least connections, either globally or within a
|
|
* service, or the numebr of current operations on a server.
|
|
*/
|
|
if ((weightby = serviceGetWeightingParameter(service)) != NULL)
|
|
{
|
|
int n, total = 0;
|
|
BACKEND *backend;
|
|
|
|
for (n = 0; router->servers[n]; n++)
|
|
{
|
|
backend = router->servers[n];
|
|
total += atoi(serverGetParameter(
|
|
backend->backend_server, weightby));
|
|
}
|
|
if (total == 0)
|
|
{
|
|
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
|
"WARNING: Weighting Parameter for service '%s' "
|
|
"will be ignored as no servers have values "
|
|
"for the parameter '%s'.\n",
|
|
service->name, weightby)));
|
|
}
|
|
else
|
|
{
|
|
for (n = 0; router->servers[n]; n++)
|
|
{
|
|
int perc;
|
|
backend = router->servers[n];
|
|
perc = (atoi(serverGetParameter(
|
|
backend->backend_server,
|
|
weightby)) * 1000) / total;
|
|
if (perc == 0)
|
|
perc = 1;
|
|
backend->weight = perc;
|
|
if (perc == 0)
|
|
{
|
|
LOGIF(LE, (skygw_log_write(
|
|
LOGFILE_ERROR,
|
|
"Server '%s' has no value "
|
|
"for weighting parameter '%s', "
|
|
"no queries will be routed to "
|
|
"this server.\n",
|
|
server->unique_name,
|
|
weightby)));
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* vraa : is this necessary for readwritesplit ?
|
|
* Option : where can a read go?
|
|
* - master (only)
|
|
* - slave (only)
|
|
* - joined (to both)
|
|
*
|
|
* Process the options
|
|
*/
|
|
router->bitmask = 0;
|
|
router->bitvalue = 0;
|
|
|
|
/** Call this before refreshInstance */
|
|
if (options)
|
|
{
|
|
rwsplit_process_router_options(router, options);
|
|
}
|
|
/**
|
|
* Set default value for max_slave_connections and for slave selection
|
|
* criteria. If parameter is set in config file max_slave_connections
|
|
* will be overwritten.
|
|
*/
|
|
router->rwsplit_config.rw_max_slave_conn_count = CONFIG_MAX_SLAVE_CONN;
|
|
|
|
if (router->rwsplit_config.rw_slave_select_criteria == UNDEFINED_CRITERIA)
|
|
{
|
|
router->rwsplit_config.rw_slave_select_criteria = DEFAULT_CRITERIA;
|
|
}
|
|
/**
|
|
* Copy all config parameters from service to router instance.
|
|
* Finally, copy version number to indicate that configs match.
|
|
*/
|
|
param = config_get_param(service->svc_config_param, "max_slave_connections");
|
|
|
|
if (param != NULL)
|
|
{
|
|
refreshInstance(router, param);
|
|
}
|
|
/**
|
|
* Read default value for slave replication lag upper limit and then
|
|
* configured value if it exists.
|
|
*/
|
|
router->rwsplit_config.rw_max_slave_replication_lag = CONFIG_MAX_SLAVE_RLAG;
|
|
param = config_get_param(service->svc_config_param, "max_slave_replication_lag");
|
|
|
|
if (param != NULL)
|
|
{
|
|
refreshInstance(router, param);
|
|
}
|
|
router->rwsplit_version = service->svc_config_version;
|
|
/**
|
|
* We have completed the creation of the router data, so now
|
|
* insert this router into the linked list of routers
|
|
* that have been created with this module.
|
|
*/
|
|
spinlock_acquire(&instlock);
|
|
router->next = instances;
|
|
instances = router;
|
|
spinlock_release(&instlock);
|
|
|
|
return (ROUTER *)router;
|
|
}
|
|
|
|
/**
|
|
* Associate a new session with this instance of the router.
|
|
*
|
|
* The session is used to store all the data required for a particular
|
|
* client connection.
|
|
*
|
|
* @param instance The router instance data
|
|
* @param session The session itself
|
|
* @return Session specific data for this session
|
|
*/
|
|
static void* newSession(
|
|
ROUTER* router_inst,
|
|
SESSION* session)
|
|
{
|
|
backend_ref_t* backend_ref; /*< array of backend references (DCB,BACKEND,cursor) */
|
|
backend_ref_t* master_ref = NULL; /*< pointer to selected master */
|
|
ROUTER_CLIENT_SES* client_rses = NULL;
|
|
ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst;
|
|
bool succp;
|
|
int router_nservers = 0; /*< # of servers in total */
|
|
int max_nslaves; /*< max # of slaves used in this session */
|
|
int max_slave_rlag; /*< max allowed replication lag for any slave */
|
|
int i;
|
|
const int min_nservers = 1; /*< hard-coded for now */
|
|
|
|
client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES));
|
|
|
|
if (client_rses == NULL)
|
|
{
|
|
ss_dassert(false);
|
|
goto return_rses;
|
|
}
|
|
#if defined(SS_DEBUG)
|
|
client_rses->rses_chk_top = CHK_NUM_ROUTER_SES;
|
|
client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES;
|
|
#endif
|
|
/**
|
|
* If service config has been changed, reload config from service to
|
|
* router instance first.
|
|
*/
|
|
spinlock_acquire(&router->lock);
|
|
|
|
if (router->service->svc_config_version > router->rwsplit_version)
|
|
{
|
|
/** re-read all parameters to rwsplit config structure */
|
|
refreshInstance(router, NULL); /*< scan through all parameters */
|
|
/** increment rwsplit router's config version number */
|
|
router->rwsplit_version = router->service->svc_config_version;
|
|
/** Read options */
|
|
rwsplit_process_router_options(router, router->service->routerOptions);
|
|
}
|
|
/** Copy config struct from router instance */
|
|
client_rses->rses_config = router->rwsplit_config;
|
|
|
|
spinlock_release(&router->lock);
|
|
/**
|
|
* Set defaults to session variables.
|
|
*/
|
|
client_rses->rses_autocommit_enabled = true;
|
|
client_rses->rses_transaction_active = false;
|
|
|
|
router_nservers = router_get_servercount(router);
|
|
|
|
if (!have_enough_servers(&client_rses,
|
|
min_nservers,
|
|
router_nservers,
|
|
router))
|
|
{
|
|
goto return_rses;
|
|
}
|
|
/**
|
|
* Create backend reference objects for this session.
|
|
*/
|
|
backend_ref = (backend_ref_t *)calloc(1, router_nservers*sizeof(backend_ref_t));
|
|
|
|
if (backend_ref == NULL)
|
|
{
|
|
/** log this */
|
|
free(client_rses);
|
|
free(backend_ref);
|
|
client_rses = NULL;
|
|
goto return_rses;
|
|
}
|
|
/**
|
|
* Initialize backend references with BACKEND ptr.
|
|
* Initialize session command cursors for each backend reference.
|
|
*/
|
|
for (i=0; i< router_nservers; i++)
|
|
{
|
|
#if defined(SS_DEBUG)
|
|
backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF;
|
|
backend_ref[i].bref_chk_tail = CHK_NUM_BACKEND_REF;
|
|
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR;
|
|
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR;
|
|
#endif
|
|
backend_ref[i].bref_state = 0;
|
|
backend_ref[i].bref_backend = router->servers[i];
|
|
/** store pointers to sescmd list to both cursors */
|
|
backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses;
|
|
backend_ref[i].bref_sescmd_cur.scmd_cur_active = false;
|
|
backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property =
|
|
&client_rses->rses_properties[RSES_PROP_TYPE_SESCMD];
|
|
backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL;
|
|
}
|
|
max_nslaves = rses_get_max_slavecount(client_rses, router_nservers);
|
|
max_slave_rlag = rses_get_max_replication_lag(client_rses);
|
|
|
|
spinlock_init(&client_rses->rses_lock);
|
|
client_rses->rses_backend_ref = backend_ref;
|
|
|
|
/**
|
|
* Find a backend servers to connect to.
|
|
* This command requires that rsession's lock is held.
|
|
*/
|
|
rses_begin_locked_router_action(client_rses);
|
|
|
|
succp = select_connect_backend_servers(&master_ref,
|
|
backend_ref,
|
|
router_nservers,
|
|
max_nslaves,
|
|
max_slave_rlag,
|
|
client_rses->rses_config.rw_slave_select_criteria,
|
|
session,
|
|
router);
|
|
|
|
rses_end_locked_router_action(client_rses);
|
|
|
|
/** Both Master and at least 1 slave must be found */
|
|
if (!succp) {
|
|
free(client_rses->rses_backend_ref);
|
|
free(client_rses);
|
|
client_rses = NULL;
|
|
goto return_rses;
|
|
}
|
|
/** Copy backend pointers to router session. */
|
|
client_rses->rses_master_ref = master_ref;
|
|
/* assert with master_host */
|
|
ss_dassert(master_ref && (master_ref->bref_backend->backend_server && SERVER_MASTER));
|
|
client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT;
|
|
client_rses->rses_backend_ref = backend_ref;
|
|
client_rses->rses_nbackends = router_nservers; /*< # of backend servers */
|
|
router->stats.n_sessions += 1;
|
|
|
|
/**
|
|
* Version is bigger than zero once initialized.
|
|
*/
|
|
atomic_add(&client_rses->rses_versno, 2);
|
|
ss_dassert(client_rses->rses_versno == 2);
|
|
/**
|
|
* Add this session to end of the list of active sessions in router.
|
|
*/
|
|
spinlock_acquire(&router->lock);
|
|
client_rses->next = router->connections;
|
|
router->connections = client_rses;
|
|
spinlock_release(&router->lock);
|
|
|
|
return_rses:
|
|
#if defined(SS_DEBUG)
|
|
if (client_rses != NULL)
|
|
{
|
|
CHK_CLIENT_RSES(client_rses);
|
|
}
|
|
#endif
|
|
return (void *)client_rses;
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
* Close a session with the router, this is the mechanism
|
|
* by which a router may cleanup data structure etc.
|
|
*
|
|
* @param instance The router instance data
|
|
* @param session The session being closed
|
|
*/
|
|
static void closeSession(
|
|
ROUTER* instance,
|
|
void* router_session)
|
|
{
|
|
ROUTER_CLIENT_SES* router_cli_ses;
|
|
backend_ref_t* backend_ref;
|
|
|
|
/**
|
|
* router session can be NULL if newSession failed and it is discarding
|
|
* its connections and DCB's.
|
|
*/
|
|
if (router_session == NULL)
|
|
{
|
|
return;
|
|
}
|
|
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
|
CHK_CLIENT_RSES(router_cli_ses);
|
|
|
|
backend_ref = router_cli_ses->rses_backend_ref;
|
|
/**
|
|
* Lock router client session for secure read and update.
|
|
*/
|
|
if (!router_cli_ses->rses_closed &&
|
|
rses_begin_locked_router_action(router_cli_ses))
|
|
{
|
|
int i = 0;
|
|
/**
|
|
* session must be moved to SESSION_STATE_STOPPING state before
|
|
* router session is closed.
|
|
*/
|
|
#if defined(SS_DEBUG)
|
|
SESSION* ses = get_session_by_router_ses((void*)router_cli_ses);
|
|
|
|
ss_dassert(ses != NULL);
|
|
ss_dassert(ses->state == SESSION_STATE_STOPPING);
|
|
#endif
|
|
|
|
/**
|
|
* This sets router closed. Nobody is allowed to use router
|
|
* whithout checking this first.
|
|
*/
|
|
router_cli_ses->rses_closed = true;
|
|
|
|
for (i=0; i<router_cli_ses->rses_nbackends; i++)
|
|
{
|
|
backend_ref_t* bref = &backend_ref[i];
|
|
DCB* dcb = bref->bref_dcb;
|
|
|
|
/** Close those which had been connected */
|
|
if (BREF_IS_IN_USE(bref))
|
|
{
|
|
CHK_DCB(dcb);
|
|
/** Clean operation counter in bref and in SERVER */
|
|
while (BREF_IS_WAITING_RESULT(bref))
|
|
{
|
|
bref_clear_state(bref, BREF_WAITING_RESULT);
|
|
}
|
|
bref_clear_state(bref, BREF_IN_USE);
|
|
bref_set_state(bref, BREF_CLOSED);
|
|
/**
|
|
* closes protocol and dcb
|
|
*/
|
|
dcb_close(dcb);
|
|
/** decrease server current connection counters */
|
|
atomic_add(&bref->bref_backend->backend_server->stats.n_current, -1);
|
|
atomic_add(&bref->bref_backend->backend_conn_count, -1);
|
|
}
|
|
}
|
|
/** Unlock */
|
|
rses_end_locked_router_action(router_cli_ses);
|
|
}
|
|
}
|
|
|
|
static void freeSession(
|
|
ROUTER* router_instance,
|
|
void* router_client_session)
|
|
{
|
|
ROUTER_CLIENT_SES* router_cli_ses;
|
|
ROUTER_INSTANCE* router;
|
|
int i;
|
|
backend_ref_t* backend_ref;
|
|
|
|
router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session;
|
|
router = (ROUTER_INSTANCE *)router_instance;
|
|
backend_ref = router_cli_ses->rses_backend_ref;
|
|
|
|
for (i=0; i<router_cli_ses->rses_nbackends; i++)
|
|
{
|
|
if (!BREF_IS_IN_USE((&backend_ref[i])))
|
|
{
|
|
continue;
|
|
}
|
|
}
|
|
spinlock_acquire(&router->lock);
|
|
|
|
if (router->connections == router_cli_ses) {
|
|
router->connections = router_cli_ses->next;
|
|
} else {
|
|
ROUTER_CLIENT_SES* ptr = router->connections;
|
|
|
|
while (ptr && ptr->next != router_cli_ses) {
|
|
ptr = ptr->next;
|
|
}
|
|
|
|
if (ptr) {
|
|
ptr->next = router_cli_ses->next;
|
|
}
|
|
}
|
|
spinlock_release(&router->lock);
|
|
|
|
/**
|
|
* For each property type, walk through the list, finalize properties
|
|
* and free the allocated memory.
|
|
*/
|
|
for (i=RSES_PROP_TYPE_FIRST; i<RSES_PROP_TYPE_COUNT; i++)
|
|
{
|
|
rses_property_t* p = router_cli_ses->rses_properties[i];
|
|
rses_property_t* q = p;
|
|
|
|
while (p != NULL)
|
|
{
|
|
q = p->rses_prop_next;
|
|
rses_property_done(p);
|
|
p = q;
|
|
}
|
|
}
|
|
/*
|
|
* We are no longer in the linked list, free
|
|
* all the memory and other resources associated
|
|
* to the client session.
|
|
*/
|
|
free(router_cli_ses->rses_backend_ref);
|
|
free(router_cli_ses);
|
|
return;
|
|
}
|
|
|
|
/**
|
|
* Provide a pointer to a suitable backend dcb.
|
|
* Detect failures in server statuses and reselect backends if necessary.
|
|
*/
|
|
static bool get_dcb(
|
|
DCB** p_dcb,
|
|
ROUTER_CLIENT_SES* rses,
|
|
backend_type_t btype)
|
|
{
|
|
backend_ref_t* backend_ref;
|
|
int smallest_nconn = -1;
|
|
int i;
|
|
bool succp = false;
|
|
BACKEND *master_host = NULL;
|
|
|
|
CHK_CLIENT_RSES(rses);
|
|
ss_dassert(p_dcb != NULL && *(p_dcb) == NULL);
|
|
|
|
if (p_dcb == NULL)
|
|
{
|
|
goto return_succp;
|
|
}
|
|
backend_ref = rses->rses_backend_ref;
|
|
|
|
/* get root master from availbal servers */
|
|
master_host = get_root_master(backend_ref, rses->rses_nbackends);
|
|
|
|
if (btype == BE_SLAVE)
|
|
{
|
|
for (i=0; i<rses->rses_nbackends; i++)
|
|
{
|
|
BACKEND* b = backend_ref[i].bref_backend;
|
|
/* check slave bit, also for relay servers (Master & Servers) */
|
|
if (BREF_IS_IN_USE((&backend_ref[i])) &&
|
|
(SERVER_IS_SLAVE(b->backend_server) || SERVER_IS_RELAY_SERVER(b->backend_server)) &&
|
|
(master_host != NULL && b->backend_server != master_host->backend_server) &&
|
|
(smallest_nconn == -1 ||
|
|
b->backend_conn_count < smallest_nconn))
|
|
{
|
|
*p_dcb = backend_ref[i].bref_dcb;
|
|
smallest_nconn = b->backend_conn_count;
|
|
succp = true;
|
|
ss_dassert(backend_ref[i].bref_dcb->state != DCB_STATE_ZOMBIE);
|
|
}
|
|
}
|
|
|
|
if (!succp)
|
|
{
|
|
backend_ref = rses->rses_master_ref;
|
|
|
|
if (BREF_IS_IN_USE(backend_ref))
|
|
{
|
|
*p_dcb = backend_ref->bref_dcb;
|
|
succp = true;
|
|
|
|
ss_dassert(backend_ref->bref_dcb->state != DCB_STATE_ZOMBIE);
|
|
|
|
ss_dassert(
|
|
(master_host && (backend_ref->bref_backend->backend_server == master_host->backend_server)) &&
|
|
smallest_nconn == -1);
|
|
|
|
LOGIF(LE, (skygw_log_write_flush(
|
|
LOGFILE_ERROR,
|
|
"Warning : No slaves connected nor "
|
|
"available. Choosing master %s:%d "
|
|
"instead.",
|
|
backend_ref->bref_backend->backend_server->name,
|
|
backend_ref->bref_backend->backend_server->port)));
|
|
}
|
|
}
|
|
ss_dassert(succp);
|
|
}
|
|
else if (btype == BE_MASTER)
|
|
{
|
|
for (i=0; i<rses->rses_nbackends; i++)
|
|
{
|
|
BACKEND* b = backend_ref[i].bref_backend;
|
|
|
|
if (BREF_IS_IN_USE((&backend_ref[i])) &&
|
|
(master_host && (b->backend_server == master_host->backend_server)))
|
|
{
|
|
*p_dcb = backend_ref[i].bref_dcb;
|
|
succp = true;
|
|
goto return_succp;
|
|
}
|
|
}
|
|
}
|
|
return_succp:
|
|
return succp;
|
|
}
|
|
|
|
/**
|
|
* The main routing entry, this is called with every packet that is
|
|
* received and has to be forwarded to the backend database.
|
|
*
|
|
* The routeQuery will make the routing decision based on the contents
|
|
* of the instance, session and the query itself in the queue. The
|
|
* data in the queue may not represent a complete query, it represents
|
|
* the data that has been received. The query router itself is responsible
|
|
* for buffering the partial query, a later call to the query router will
|
|
* contain the remainder, or part thereof of the query.
|
|
*
|
|
* @param instance The query router instance
|
|
* @param session The session associated with the client
|
|
* @param queue Gateway buffer queue with the packets received
|
|
*
|
|
* @return if succeed 1, otherwise 0
|
|
* If routeQuery fails, it means that router session has failed.
|
|
* In any tolerated failure, handleError is called and if necessary,
|
|
* an error message is sent to the client.
|
|
*
|
|
* For now, routeQuery don't tolerate errors, so any error will close
|
|
* the session. vraa 14.6.14
|
|
*/
|
|
static int routeQuery(
|
|
ROUTER* instance,
|
|
void* router_session,
|
|
GWBUF* querybuf)
|
|
{
|
|
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
|
|
mysql_server_cmd_t packet_type;
|
|
uint8_t* packet;
|
|
int ret = 0;
|
|
DCB* master_dcb = NULL;
|
|
DCB* slave_dcb = NULL;
|
|
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
|
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
|
bool rses_is_closed = false;
|
|
|
|
CHK_CLIENT_RSES(router_cli_ses);
|
|
|
|
/** Dirty read for quick check if router is closed. */
|
|
if (router_cli_ses->rses_closed)
|
|
{
|
|
rses_is_closed = true;
|
|
}
|
|
|
|
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
|
|
|
|
packet = GWBUF_DATA(querybuf);
|
|
packet_type = packet[4];
|
|
|
|
if (rses_is_closed)
|
|
{
|
|
/**
|
|
* MYSQL_COM_QUIT may have sent by client and as a part of backend
|
|
* closing procedure.
|
|
*/
|
|
if (packet_type != MYSQL_COM_QUIT)
|
|
{
|
|
char* query_str = modutil_get_query(querybuf);
|
|
|
|
LOGIF(LE,
|
|
(skygw_log_write_flush(
|
|
LOGFILE_ERROR,
|
|
"Error: Failed to route %s:%s:\"%s\" to "
|
|
"backend server. %s.",
|
|
STRPACKETTYPE(packet_type),
|
|
STRQTYPE(qtype),
|
|
(query_str == NULL ? "(empty)" : query_str),
|
|
(rses_is_closed ? "Router was closed" :
|
|
"Router has no backend servers where to "
|
|
"route to"))));
|
|
free(querybuf);
|
|
}
|
|
goto return_ret;
|
|
}
|
|
inst->stats.n_queries++;
|
|
|
|
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
|
|
CHK_DCB(master_dcb);
|
|
|
|
switch(packet_type) {
|
|
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
|
|
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
|
|
case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */
|
|
case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
|
|
case MYSQL_COM_PING: /*< 0e all servers are pinged */
|
|
case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
|
|
case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */
|
|
case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */
|
|
case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */
|
|
qtype = QUERY_TYPE_SESSION_WRITE;
|
|
break;
|
|
|
|
case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */
|
|
case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */
|
|
qtype = QUERY_TYPE_WRITE;
|
|
break;
|
|
|
|
case MYSQL_COM_QUERY:
|
|
qtype = query_classifier_get_type(querybuf);
|
|
break;
|
|
|
|
case MYSQL_COM_STMT_PREPARE:
|
|
qtype = query_classifier_get_type(querybuf);
|
|
qtype |= QUERY_TYPE_PREPARE_STMT;
|
|
break;
|
|
|
|
case MYSQL_COM_STMT_EXECUTE:
|
|
/** Parsing is not needed for this type of packet */
|
|
qtype = QUERY_TYPE_EXEC_STMT;
|
|
break;
|
|
|
|
case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
|
|
case MYSQL_COM_STATISTICS: /**< 9 ? */
|
|
case MYSQL_COM_PROCESS_INFO: /**< 0a ? */
|
|
case MYSQL_COM_CONNECT: /**< 0b ? */
|
|
case MYSQL_COM_PROCESS_KILL: /**< 0c ? */
|
|
case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */
|
|
case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */
|
|
case MYSQL_COM_DAEMON: /**< 1d ? */
|
|
default:
|
|
break;
|
|
} /**< switch by packet type */
|
|
|
|
/**
|
|
* If autocommit is disabled or transaction is explicitly started
|
|
* transaction becomes active and master gets all statements until
|
|
* transaction is committed and autocommit is enabled again.
|
|
*/
|
|
if (router_cli_ses->rses_autocommit_enabled &&
|
|
QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT))
|
|
{
|
|
router_cli_ses->rses_autocommit_enabled = false;
|
|
|
|
if (!router_cli_ses->rses_transaction_active)
|
|
{
|
|
router_cli_ses->rses_transaction_active = true;
|
|
}
|
|
}
|
|
else if (!router_cli_ses->rses_transaction_active &&
|
|
QUERY_IS_TYPE(qtype, QUERY_TYPE_BEGIN_TRX))
|
|
{
|
|
router_cli_ses->rses_transaction_active = true;
|
|
}
|
|
/**
|
|
* Explicit COMMIT and ROLLBACK, implicit COMMIT.
|
|
*/
|
|
if (router_cli_ses->rses_autocommit_enabled &&
|
|
router_cli_ses->rses_transaction_active &&
|
|
(QUERY_IS_TYPE(qtype,QUERY_TYPE_COMMIT) ||
|
|
QUERY_IS_TYPE(qtype,QUERY_TYPE_ROLLBACK)))
|
|
{
|
|
router_cli_ses->rses_transaction_active = false;
|
|
}
|
|
else if (!router_cli_ses->rses_autocommit_enabled &&
|
|
QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT))
|
|
{
|
|
router_cli_ses->rses_autocommit_enabled = true;
|
|
router_cli_ses->rses_transaction_active = false;
|
|
}
|
|
/**
|
|
* Session update is always routed in the same way.
|
|
*/
|
|
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE) ||
|
|
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_STMT) ||
|
|
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_NAMED_STMT))
|
|
{
|
|
/**
|
|
* It is not sure if the session command in question requires
|
|
* response. Statement is examined in route_session_write.
|
|
*/
|
|
bool succp = route_session_write(
|
|
router_cli_ses,
|
|
gwbuf_clone(querybuf),
|
|
inst,
|
|
packet_type,
|
|
qtype);
|
|
|
|
if (succp)
|
|
{
|
|
ret = 1;
|
|
}
|
|
goto return_ret;
|
|
}
|
|
else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) &&
|
|
!router_cli_ses->rses_transaction_active)
|
|
{
|
|
bool succp;
|
|
|
|
LOGIF(LT, (skygw_log_write(
|
|
LOGFILE_TRACE,
|
|
"[%s]\tRead-only query, routing to Slave.",
|
|
inst->service->name)));
|
|
ss_dassert(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ));
|
|
|
|
/** Lock router session */
|
|
if (!rses_begin_locked_router_action(router_cli_ses))
|
|
{
|
|
goto return_ret;
|
|
}
|
|
succp = get_dcb(&slave_dcb, router_cli_ses, BE_SLAVE);
|
|
|
|
if (succp)
|
|
{
|
|
if ((ret = slave_dcb->func.write(slave_dcb, gwbuf_clone(querybuf))) == 1)
|
|
{
|
|
backend_ref_t* bref;
|
|
|
|
atomic_add(&inst->stats.n_slave, 1);
|
|
/**
|
|
* Add one query response waiter to backend reference
|
|
*/
|
|
bref = get_bref_from_dcb(router_cli_ses, slave_dcb);
|
|
bref_set_state(bref, BREF_QUERY_ACTIVE);
|
|
bref_set_state(bref, BREF_WAITING_RESULT);
|
|
}
|
|
else
|
|
{
|
|
char* query_str = modutil_get_query(querybuf);
|
|
LOGIF(LE, (skygw_log_write_flush(
|
|
LOGFILE_ERROR,
|
|
"Error : Routing query \"%s\" failed.",
|
|
(query_str == NULL ? "not available" : query_str))));
|
|
free(query_str);
|
|
}
|
|
}
|
|
rses_end_locked_router_action(router_cli_ses);
|
|
|
|
ss_dassert(succp);
|
|
goto return_ret;
|
|
}
|
|
else
|
|
{
|
|
bool succp = true;
|
|
|
|
if (LOG_IS_ENABLED(LOGFILE_TRACE))
|
|
{
|
|
if (router_cli_ses->rses_transaction_active) /*< all to master */
|
|
{
|
|
LOGIF(LT, (skygw_log_write(
|
|
LOGFILE_TRACE,
|
|
"Transaction is active, routing to Master.")));
|
|
}
|
|
else
|
|
{
|
|
LOGIF(LT, (skygw_log_write(
|
|
LOGFILE_TRACE,
|
|
"Begin transaction, write or unspecified type, "
|
|
"routing to Master.")));
|
|
}
|
|
}
|
|
/** Lock router session */
|
|
if (!rses_begin_locked_router_action(router_cli_ses))
|
|
{
|
|
goto return_ret;
|
|
}
|
|
|
|
if (master_dcb == NULL)
|
|
{
|
|
succp = get_dcb(&master_dcb, router_cli_ses, BE_MASTER);
|
|
}
|
|
|
|
if (succp)
|
|
{
|
|
if ((ret = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf))) == 1)
|
|
{
|
|
backend_ref_t* bref;
|
|
|
|
atomic_add(&inst->stats.n_master, 1);
|
|
|
|
/**
|
|
* Add one write response waiter to backend reference
|
|
*/
|
|
bref = get_bref_from_dcb(router_cli_ses, master_dcb);
|
|
bref_set_state(bref, BREF_QUERY_ACTIVE);
|
|
bref_set_state(bref, BREF_WAITING_RESULT);
|
|
}
|
|
}
|
|
rses_end_locked_router_action(router_cli_ses);
|
|
|
|
ss_dassert(succp);
|
|
|
|
if (ret == 0)
|
|
{
|
|
LOGIF(LE, (skygw_log_write_flush(
|
|
LOGFILE_ERROR,
|
|
"Error : Routing to master failed.")));
|
|
}
|
|
}
|
|
return_ret:
|
|
#if defined(SS_DEBUG)
|
|
{
|
|
char* canonical_query_str;
|
|
|
|
canonical_query_str = skygw_get_canonical(querybuf);
|
|
|
|
if (canonical_query_str != NULL)
|
|
{
|
|
LOGIF(LT, (skygw_log_write(
|
|
LOGFILE_TRACE,
|
|
"Canonical version: %s",
|
|
canonical_query_str)));
|
|
free(canonical_query_str);
|
|
}
|
|
}
|
|
#endif
|
|
gwbuf_free(querybuf);
|
|
return ret;
|
|
}
|
|
|
|
|
|
/** to be inline'd */
|
|
/**
|
|
* @node Acquires lock to router client session if it is not closed.
|
|
*
|
|
* Parameters:
|
|
* @param rses - in, use
|
|
*
|
|
*
|
|
* @return true if router session was not closed. If return value is true
|
|
* it means that router is locked, and must be unlocked later. False, if
|
|
* router was closed before lock was acquired.
|
|
*
|
|
*
|
|
* @details (write detailed description here)
|
|
*
|
|
*/
|
|
static bool rses_begin_locked_router_action(
|
|
ROUTER_CLIENT_SES* rses)
|
|
{
|
|
bool succp = false;
|
|
|
|
CHK_CLIENT_RSES(rses);
|
|
|
|
if (rses->rses_closed) {
|
|
|
|
goto return_succp;
|
|
}
|
|
spinlock_acquire(&rses->rses_lock);
|
|
if (rses->rses_closed) {
|
|
spinlock_release(&rses->rses_lock);
|
|
goto return_succp;
|
|
}
|
|
succp = true;
|
|
|
|
return_succp:
|
|
return succp;
|
|
}
|
|
|
|
/** to be inline'd */
|
|
/**
|
|
* @node Releases router client session lock.
|
|
*
|
|
* Parameters:
|
|
* @param rses - <usage>
|
|
* <description>
|
|
*
|
|
* @return void
|
|
*
|
|
*
|
|
* @details (write detailed description here)
|
|
*
|
|
*/
|
|
static void rses_end_locked_router_action(
|
|
ROUTER_CLIENT_SES* rses)
|
|
{
|
|
CHK_CLIENT_RSES(rses);
|
|
spinlock_release(&rses->rses_lock);
|
|
}
|
|
|
|
|
|
/**
|
|
* Diagnostics routine
|
|
*
|
|
* Print query router statistics to the DCB passed in
|
|
*
|
|
* @param instance The router instance
|
|
* @param dcb The DCB for diagnostic output
|
|
*/
|
|
static void
|
|
diagnostic(ROUTER *instance, DCB *dcb)
|
|
{
|
|
ROUTER_CLIENT_SES *router_cli_ses;
|
|
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
|
|
int i = 0;
|
|
BACKEND *backend;
|
|
char *weightby;
|
|
|
|
spinlock_acquire(&router->lock);
|
|
router_cli_ses = router->connections;
|
|
while (router_cli_ses)
|
|
{
|
|
i++;
|
|
router_cli_ses = router_cli_ses->next;
|
|
}
|
|
spinlock_release(&router->lock);
|
|
|
|
dcb_printf(dcb,
|
|
"\tNumber of router sessions: %d\n",
|
|
router->stats.n_sessions);
|
|
dcb_printf(dcb,
|
|
"\tCurrent no. of router sessions: %d\n",
|
|
i);
|
|
dcb_printf(dcb,
|
|
"\tNumber of queries forwarded: %d\n",
|
|
router->stats.n_queries);
|
|
dcb_printf(dcb,
|
|
"\tNumber of queries forwarded to master: %d\n",
|
|
router->stats.n_master);
|
|
dcb_printf(dcb,
|
|
"\tNumber of queries forwarded to slave: %d\n",
|
|
router->stats.n_slave);
|
|
dcb_printf(dcb,
|
|
"\tNumber of queries forwarded to all: %d\n",
|
|
router->stats.n_all);
|
|
if ((weightby = serviceGetWeightingParameter(router->service)) != NULL)
|
|
{
|
|
dcb_printf(dcb,
|
|
"\tConnection distribution based on %s "
|
|
"server parameter.\n", weightby);
|
|
dcb_printf(dcb,
|
|
"\t\tServer Target %% Connections "
|
|
"Operations\n");
|
|
dcb_printf(dcb,
|
|
"\t\t Global Router\n");
|
|
for (i = 0; router->servers[i]; i++)
|
|
{
|
|
backend = router->servers[i];
|
|
dcb_printf(dcb,
|
|
"\t\t%-20s %3.1f%% %-6d %-6d %d\n",
|
|
backend->backend_server->unique_name,
|
|
(float)backend->weight / 10,
|
|
backend->backend_server->stats.n_current,
|
|
backend->backend_conn_count,
|
|
backend->backend_server->stats.n_current_ops);
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
/**
|
|
* Client Reply routine
|
|
*
|
|
* The routine will reply to client for session change with master server data
|
|
*
|
|
* @param instance The router instance
|
|
* @param router_session The router session
|
|
* @param backend_dcb The backend DCB
|
|
* @param queue The GWBUF with reply data
|
|
*/
|
|
static void clientReply (
|
|
ROUTER* instance,
|
|
void* router_session,
|
|
GWBUF* writebuf,
|
|
DCB* backend_dcb)
|
|
{
|
|
DCB* client_dcb;
|
|
ROUTER_CLIENT_SES* router_cli_ses;
|
|
sescmd_cursor_t* scur = NULL;
|
|
backend_ref_t* bref;
|
|
|
|
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
|
CHK_CLIENT_RSES(router_cli_ses);
|
|
|
|
/**
|
|
* Lock router client session for secure read of router session members.
|
|
* Note that this could be done without lock by using version #
|
|
*/
|
|
if (!rses_begin_locked_router_action(router_cli_ses))
|
|
{
|
|
print_error_packet(router_cli_ses, writebuf, backend_dcb);
|
|
goto lock_failed;
|
|
}
|
|
/** Holding lock ensures that router session remains open */
|
|
ss_dassert(backend_dcb->session != NULL);
|
|
client_dcb = backend_dcb->session->client;
|
|
|
|
/** Unlock */
|
|
rses_end_locked_router_action(router_cli_ses);
|
|
/**
|
|
* 1. Check if backend received reply to sescmd.
|
|
* 2. Check sescmd's state whether OK_PACKET has been
|
|
* sent to client already and if not, lock property cursor,
|
|
* reply to client, and move property cursor forward. Finally
|
|
* release the lock.
|
|
* 3. If reply for this sescmd is sent, lock property cursor
|
|
* and
|
|
*/
|
|
if (client_dcb == NULL)
|
|
{
|
|
while ((writebuf = gwbuf_consume(
|
|
writebuf,
|
|
GWBUF_LENGTH(writebuf))) != NULL);
|
|
/** Log that client was closed before reply */
|
|
goto lock_failed;
|
|
}
|
|
/** Lock router session */
|
|
if (!rses_begin_locked_router_action(router_cli_ses))
|
|
{
|
|
/** Log to debug that router was closed */
|
|
goto lock_failed;
|
|
}
|
|
bref = get_bref_from_dcb(router_cli_ses, backend_dcb);
|
|
|
|
CHK_BACKEND_REF(bref);
|
|
scur = &bref->bref_sescmd_cur;
|
|
/**
|
|
* Active cursor means that reply is from session command
|
|
* execution.
|
|
*/
|
|
if (sescmd_cursor_is_active(scur))
|
|
{
|
|
if (LOG_IS_ENABLED(LOGFILE_ERROR) &&
|
|
MYSQL_IS_ERROR_PACKET(((uint8_t *)GWBUF_DATA(writebuf))))
|
|
{
|
|
SESSION* ses = backend_dcb->session;
|
|
uint8_t* buf =
|
|
(uint8_t *)GWBUF_DATA((scur->scmd_cur_cmd->my_sescmd_buf));
|
|
size_t len = MYSQL_GET_PACKET_LEN(buf);
|
|
char* cmdstr = (char *)malloc(len+1);
|
|
/** data+termination character == len */
|
|
snprintf(cmdstr, len, "%s", &buf[5]);
|
|
|
|
ss_dassert(len+4 == GWBUF_LENGTH(scur->scmd_cur_cmd->my_sescmd_buf));
|
|
|
|
LOGIF(LE, (skygw_log_write_flush(
|
|
LOGFILE_ERROR,
|
|
"Error : Failed to execute %s in %s:%d.",
|
|
cmdstr,
|
|
bref->bref_backend->backend_server->name,
|
|
bref->bref_backend->backend_server->port)));
|
|
|
|
free(cmdstr);
|
|
}
|
|
|
|
if (GWBUF_IS_TYPE_SESCMD_RESPONSE(writebuf))
|
|
{
|
|
/**
|
|
* Discard all those responses that have already been sent to
|
|
* the client. Return with buffer including response that
|
|
* needs to be sent to client or NULL.
|
|
*/
|
|
writebuf = sescmd_cursor_process_replies(writebuf, bref);
|
|
}
|
|
/**
|
|
* If response will be sent to client, decrease waiter count.
|
|
* This applies to session commands only. Counter decrement
|
|
* for other type of queries is done outside this block.
|
|
*/
|
|
if (writebuf != NULL && client_dcb != NULL)
|
|
{
|
|
/** Set response status as replied */
|
|
bref_clear_state(bref, BREF_WAITING_RESULT);
|
|
}
|
|
}
|
|
/**
|
|
* Clear BREF_QUERY_ACTIVE flag and decrease waiter counter.
|
|
* This applies for queries other than session commands.
|
|
*/
|
|
else if (BREF_IS_QUERY_ACTIVE(bref))
|
|
{
|
|
bref_clear_state(bref, BREF_QUERY_ACTIVE);
|
|
/** Set response status as replied */
|
|
bref_clear_state(bref, BREF_WAITING_RESULT);
|
|
}
|
|
|
|
if (writebuf != NULL && client_dcb != NULL)
|
|
{
|
|
/** Write reply to client DCB */
|
|
SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
|
|
}
|
|
/** Unlock router session */
|
|
rses_end_locked_router_action(router_cli_ses);
|
|
|
|
/** Lock router session */
|
|
if (!rses_begin_locked_router_action(router_cli_ses))
|
|
{
|
|
/** Log to debug that router was closed */
|
|
goto lock_failed;
|
|
}
|
|
/** There is one pending session command to be executed. */
|
|
if (sescmd_cursor_is_active(scur))
|
|
{
|
|
bool succp;
|
|
|
|
LOGIF(LT, (skygw_log_write(
|
|
LOGFILE_TRACE,
|
|
"Backend %s:%d processed reply and starts to execute "
|
|
"active cursor.",
|
|
bref->bref_backend->backend_server->name,
|
|
bref->bref_backend->backend_server->port)));
|
|
|
|
succp = execute_sescmd_in_backend(bref);
|
|
|
|
ss_dassert(succp);
|
|
}
|
|
/** Unlock router session */
|
|
rses_end_locked_router_action(router_cli_ses);
|
|
|
|
lock_failed:
|
|
return;
|
|
}
|
|
|
|
/** Compare nunmber of connections from this router in backend servers */
|
|
int bref_cmp_router_conn(
|
|
const void* bref1,
|
|
const void* bref2)
|
|
{
|
|
BACKEND* b1 = ((backend_ref_t *)bref1)->bref_backend;
|
|
BACKEND* b2 = ((backend_ref_t *)bref2)->bref_backend;
|
|
|
|
return ((1000 * b1->backend_conn_count) / b1->weight)
|
|
- ((1000 * b2->backend_conn_count) / b2->weight);
|
|
}
|
|
|
|
/** Compare nunmber of global connections in backend servers */
|
|
int bref_cmp_global_conn(
|
|
const void* bref1,
|
|
const void* bref2)
|
|
{
|
|
BACKEND* b1 = ((backend_ref_t *)bref1)->bref_backend;
|
|
BACKEND* b2 = ((backend_ref_t *)bref2)->bref_backend;
|
|
|
|
return ((1000 * b1->backend_server->stats.n_current) / b1->weight)
|
|
- ((1000 * b2->backend_server->stats.n_current) / b2->weight);
|
|
}
|
|
|
|
|
|
/** Compare relication lag between backend servers */
|
|
int bref_cmp_behind_master(
|
|
const void* bref1,
|
|
const void* bref2)
|
|
{
|
|
BACKEND* b1 = ((backend_ref_t *)bref1)->bref_backend;
|
|
BACKEND* b2 = ((backend_ref_t *)bref2)->bref_backend;
|
|
|
|
return ((b1->backend_server->rlag < b2->backend_server->rlag) ? -1 :
|
|
((b1->backend_server->rlag > b2->backend_server->rlag) ? 1 : 0));
|
|
}
|
|
|
|
/** Compare nunmber of current operations in backend servers */
|
|
int bref_cmp_current_load(
|
|
const void* bref1,
|
|
const void* bref2)
|
|
{
|
|
SERVER* s1 = ((backend_ref_t *)bref1)->bref_backend->backend_server;
|
|
SERVER* s2 = ((backend_ref_t *)bref2)->bref_backend->backend_server;
|
|
BACKEND* b1 = ((backend_ref_t *)bref1)->bref_backend;
|
|
BACKEND* b2 = ((backend_ref_t *)bref2)->bref_backend;
|
|
|
|
return ((1000 * s1->stats.n_current_ops) - b1->weight)
|
|
- ((1000 * s2->stats.n_current_ops) - b2->weight);
|
|
}
|
|
|
|
static void bref_clear_state(
|
|
backend_ref_t* bref,
|
|
bref_state_t state)
|
|
{
|
|
if (state != BREF_WAITING_RESULT)
|
|
{
|
|
bref->bref_state &= ~state;
|
|
}
|
|
else
|
|
{
|
|
int prev1;
|
|
int prev2;
|
|
|
|
/** Decrease waiter count */
|
|
prev1 = atomic_add(&bref->bref_num_result_wait, -1);
|
|
|
|
if (prev1 <= 0) {
|
|
atomic_add(&bref->bref_num_result_wait, 1);
|
|
}
|
|
else
|
|
{
|
|
/** Decrease global operation count */
|
|
prev2 = atomic_add(
|
|
&bref->bref_backend->backend_server->stats.n_current_ops, -1);
|
|
ss_dassert(prev2 > 0);
|
|
}
|
|
}
|
|
}
|
|
|
|
static void bref_set_state(
|
|
backend_ref_t* bref,
|
|
bref_state_t state)
|
|
{
|
|
if (state != BREF_WAITING_RESULT)
|
|
{
|
|
bref->bref_state |= state;
|
|
}
|
|
else
|
|
{
|
|
int prev1;
|
|
int prev2;
|
|
|
|
/** Increase waiter count */
|
|
prev1 = atomic_add(&bref->bref_num_result_wait, 1);
|
|
ss_dassert(prev1 >= 0);
|
|
|
|
/** Increase global operation count */
|
|
prev2 = atomic_add(
|
|
&bref->bref_backend->backend_server->stats.n_current_ops, 1);
|
|
ss_dassert(prev2 >= 0);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @node Search suitable backend servers from those of router instance.
|
|
*
|
|
* Parameters:
|
|
* @param p_master_ref - in, use, out
|
|
* Pointer to location where master's backend reference is to be stored.
|
|
* NULL is not allowed.
|
|
*
|
|
* @param backend_ref - in, use, out
|
|
* Pointer to backend server reference object array.
|
|
* NULL is not allowed.
|
|
*
|
|
* @param router_nservers - in, use
|
|
* Number of backend server pointers pointed to by b.
|
|
*
|
|
* @param max_nslaves - in, use
|
|
* Upper limit for the number of slaves. Configuration parameter or default.
|
|
*
|
|
* @param max_slave_rlag - in, use
|
|
* Maximum allowed replication lag for any slave. Configuration parameter or default.
|
|
*
|
|
* @param session - in, use
|
|
* MaxScale session pointer used when connection to backend is established.
|
|
*
|
|
* @param router - in, use
|
|
* Pointer to router instance. Used when server states are qualified.
|
|
*
|
|
* @return true, if at least one master and one slave was found.
|
|
*
|
|
*
|
|
* @details It is assumed that there is only one master among servers of
|
|
* a router instance. As a result, the first master found is chosen.
|
|
* There will possibly be more backend references than connected backends
|
|
* because only those in correct state are connected to.
|
|
*/
|
|
static bool select_connect_backend_servers(
|
|
backend_ref_t** p_master_ref,
|
|
backend_ref_t* backend_ref,
|
|
int router_nservers,
|
|
int max_nslaves,
|
|
int max_slave_rlag,
|
|
select_criteria_t select_criteria,
|
|
SESSION* session,
|
|
ROUTER_INSTANCE* router)
|
|
{
|
|
bool succp = true;
|
|
bool master_found;
|
|
bool master_connected;
|
|
int slaves_found = 0;
|
|
int slaves_connected = 0;
|
|
int i;
|
|
const int min_nslaves = 0; /*< not configurable at the time */
|
|
bool is_synced_master;
|
|
int (*p)(const void *, const void *);
|
|
BACKEND *master_host = NULL;
|
|
|
|
if (p_master_ref == NULL || backend_ref == NULL)
|
|
{
|
|
ss_dassert(FALSE);
|
|
succp = false;
|
|
goto return_succp;
|
|
}
|
|
|
|
/* get the root Master */
|
|
master_host = get_root_master(backend_ref, router_nservers);
|
|
|
|
/** Master is already chosen and connected. This is slave failure case */
|
|
if (*p_master_ref != NULL &&
|
|
BREF_IS_IN_USE((*p_master_ref)))
|
|
{
|
|
LOGIF(LD, (skygw_log_write(
|
|
LOGFILE_DEBUG,
|
|
"%lu [select_connect_backend_servers] Master %p fd %d found.",
|
|
pthread_self(),
|
|
(*p_master_ref)->bref_dcb,
|
|
(*p_master_ref)->bref_dcb->fd)));
|
|
|
|
master_found = true;
|
|
master_connected = true;
|
|
/* assert with master_host */
|
|
ss_dassert(master_host && ((*p_master_ref)->bref_backend->backend_server == master_host->backend_server) && SERVER_MASTER);
|
|
}
|
|
/** New session or master failure case */
|
|
else
|
|
{
|
|
LOGIF(LD, (skygw_log_write(
|
|
LOGFILE_DEBUG,
|
|
"%lu [select_connect_backend_servers] Session %p doesn't "
|
|
"currently have a master chosen. Proceeding to master "
|
|
"selection.",
|
|
pthread_self(),
|
|
session)));
|
|
|
|
master_found = false;
|
|
master_connected = false;
|
|
}
|
|
/** Check slave selection criteria and set compare function */
|
|
p = criteria_cmpfun[select_criteria];
|
|
|
|
if (p == NULL)
|
|
{
|
|
succp = false;
|
|
goto return_succp;
|
|
}
|
|
|
|
if (router->bitvalue != 0) /*< 'synced' is the only bitvalue in rwsplit */
|
|
{
|
|
is_synced_master = true;
|
|
}
|
|
else
|
|
{
|
|
is_synced_master = false;
|
|
}
|
|
|
|
#if defined(EXTRA_SS_DEBUG)
|
|
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns before ordering:")));
|
|
|
|
for (i=0; i<router_nservers; i++)
|
|
{
|
|
BACKEND* b = backend_ref[i].bref_backend;
|
|
|
|
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
|
|
"master bref %p bref %p %d %s %d:%d",
|
|
*p_master_ref,
|
|
&backend_ref[i],
|
|
backend_ref[i].bref_state,
|
|
b->backend_server->name,
|
|
b->backend_server->port,
|
|
b->backend_conn_count)));
|
|
}
|
|
#endif
|
|
/* assert with master_host */
|
|
ss_dassert(!master_connected ||
|
|
(master_host && ((*p_master_ref)->bref_backend->backend_server == master_host->backend_server) && SERVER_MASTER));
|
|
/**
|
|
* Sort the pointer list to servers according to connection counts. As
|
|
* a consequence those backends having least connections are in the
|
|
* beginning of the list.
|
|
*/
|
|
qsort(backend_ref, (size_t)router_nservers, sizeof(backend_ref_t), p);
|
|
|
|
if (LOG_IS_ENABLED(LOGFILE_TRACE))
|
|
{
|
|
if (select_criteria == LEAST_GLOBAL_CONNECTIONS ||
|
|
select_criteria == LEAST_ROUTER_CONNECTIONS ||
|
|
select_criteria == LEAST_BEHIND_MASTER ||
|
|
select_criteria == LEAST_CURRENT_OPERATIONS)
|
|
{
|
|
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
|
|
"Servers and %s connection counts:",
|
|
select_criteria == LEAST_GLOBAL_CONNECTIONS ?
|
|
"all MaxScale" : "router")));
|
|
|
|
for (i=0; i<router_nservers; i++)
|
|
{
|
|
BACKEND* b = backend_ref[i].bref_backend;
|
|
|
|
switch(select_criteria) {
|
|
case LEAST_GLOBAL_CONNECTIONS:
|
|
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
|
|
"%s:%d MaxScale connections : %d",
|
|
b->backend_server->name,
|
|
b->backend_server->port,
|
|
b->backend_server->stats.n_current)));
|
|
break;
|
|
|
|
case LEAST_ROUTER_CONNECTIONS:
|
|
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
|
|
"%s:%d RWSplit connections : %d",
|
|
b->backend_server->name,
|
|
b->backend_server->port,
|
|
b->backend_conn_count)));
|
|
break;
|
|
|
|
case LEAST_CURRENT_OPERATIONS:
|
|
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
|
|
"%s:%d current operations : %d",
|
|
b->backend_server->name,
|
|
b->backend_server->port,
|
|
b->backend_server->stats.n_current_ops)));
|
|
break;
|
|
|
|
case LEAST_BEHIND_MASTER:
|
|
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
|
|
"%s:%d replication lag : %d",
|
|
b->backend_server->name,
|
|
b->backend_server->port,
|
|
b->backend_server->rlag)));
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
} /*< log only */
|
|
|
|
/**
|
|
* Choose at least 1+min_nslaves (master and slave) and at most 1+max_nslaves
|
|
* servers from the sorted list. First master found is selected.
|
|
*/
|
|
for (i=0;
|
|
i<router_nservers && (slaves_connected < max_nslaves || !master_connected);
|
|
i++)
|
|
{
|
|
BACKEND* b = backend_ref[i].bref_backend;
|
|
|
|
if (SERVER_IS_RUNNING(b->backend_server) &&
|
|
((b->backend_server->status & router->bitmask) ==
|
|
router->bitvalue))
|
|
{
|
|
/* check also for relay servers and don't take the master_host */
|
|
if (slaves_found < max_nslaves &&
|
|
(max_slave_rlag == -2 ||
|
|
(b->backend_server->rlag != -1 && /*< information currently not available */
|
|
b->backend_server->rlag <= max_slave_rlag)) &&
|
|
(SERVER_IS_SLAVE(b->backend_server) || SERVER_IS_RELAY_SERVER(b->backend_server)) &&
|
|
(master_host != NULL && (b->backend_server != master_host->backend_server)))
|
|
{
|
|
slaves_found += 1;
|
|
|
|
/** Slave is already connected */
|
|
if (BREF_IS_IN_USE((&backend_ref[i])))
|
|
{
|
|
slaves_connected += 1;
|
|
}
|
|
/** New slave connection is taking place */
|
|
else
|
|
{
|
|
backend_ref[i].bref_dcb = dcb_connect(
|
|
b->backend_server,
|
|
session,
|
|
b->backend_server->protocol);
|
|
|
|
if (backend_ref[i].bref_dcb != NULL)
|
|
{
|
|
slaves_connected += 1;
|
|
/**
|
|
* Start executing session command
|
|
* history.
|
|
*/
|
|
execute_sescmd_history(&backend_ref[i]);
|
|
/**
|
|
* When server fails, this callback
|
|
* is called.
|
|
*/
|
|
dcb_add_callback(
|
|
backend_ref[i].bref_dcb,
|
|
DCB_REASON_NOT_RESPONDING,
|
|
&router_handle_state_switch,
|
|
(void *)&backend_ref[i]);
|
|
backend_ref[i].bref_state = 0;
|
|
bref_set_state(&backend_ref[i],
|
|
BREF_IN_USE);
|
|
/**
|
|
* Increase backend connection counter.
|
|
* Server's stats are _increased_ in
|
|
* dcb.c:dcb_alloc !
|
|
* But decreased in the calling function
|
|
* of dcb_close.
|
|
*/
|
|
atomic_add(&b->backend_conn_count, 1);
|
|
}
|
|
else
|
|
{
|
|
LOGIF(LE, (skygw_log_write_flush(
|
|
LOGFILE_ERROR,
|
|
"Error : Unable to establish "
|
|
"connection with slave %s:%d",
|
|
b->backend_server->name,
|
|
b->backend_server->port)));
|
|
/* handle connect error */
|
|
}
|
|
}
|
|
}
|
|
/* take the master_host for master */
|
|
else if (master_host &&
|
|
(b->backend_server == master_host->backend_server))
|
|
{
|
|
*p_master_ref = &backend_ref[i];
|
|
|
|
if (master_connected)
|
|
{
|
|
continue;
|
|
}
|
|
master_found = true;
|
|
|
|
backend_ref[i].bref_dcb = dcb_connect(
|
|
b->backend_server,
|
|
session,
|
|
b->backend_server->protocol);
|
|
|
|
if (backend_ref[i].bref_dcb != NULL)
|
|
{
|
|
master_connected = true;
|
|
/**
|
|
* When server fails, this callback
|
|
* is called.
|
|
*/
|
|
dcb_add_callback(
|
|
backend_ref[i].bref_dcb,
|
|
DCB_REASON_NOT_RESPONDING,
|
|
&router_handle_state_switch,
|
|
(void *)&backend_ref[i]);
|
|
|
|
backend_ref[i].bref_state = 0;
|
|
bref_set_state(&backend_ref[i],
|
|
BREF_IN_USE);
|
|
/** Increase backend connection counters */
|
|
atomic_add(&b->backend_conn_count, 1);
|
|
}
|
|
else
|
|
{
|
|
succp = false;
|
|
LOGIF(LE, (skygw_log_write_flush(
|
|
LOGFILE_ERROR,
|
|
"Error : Unable to establish "
|
|
"connection with master %s:%d",
|
|
b->backend_server->name,
|
|
b->backend_server->port)));
|
|
/** handle connect error */
|
|
}
|
|
}
|
|
}
|
|
} /*< for */
|
|
|
|
#if defined(EXTRA_SS_DEBUG)
|
|
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns after ordering:")));
|
|
|
|
for (i=0; i<router_nservers; i++)
|
|
{
|
|
BACKEND* b = backend_ref[i].bref_backend;
|
|
|
|
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
|
|
"master bref %p bref %p %d %s %d:%d",
|
|
*p_master_ref,
|
|
&backend_ref[i],
|
|
backend_ref[i].bref_state,
|
|
b->backend_server->name,
|
|
b->backend_server->port,
|
|
b->backend_conn_count)));
|
|
}
|
|
/* assert with master_host */
|
|
ss_dassert(!master_connected ||
|
|
(master_host && ((*p_master_ref)->bref_backend->backend_server == master_host->backend_server) && SERVER_MASTER));
|
|
#endif
|
|
|
|
/**
|
|
* Successful cases
|
|
*/
|
|
if (master_connected &&
|
|
slaves_connected >= min_nslaves &&
|
|
slaves_connected <= max_nslaves)
|
|
{
|
|
succp = true;
|
|
|
|
if (slaves_connected == 0 && slaves_found > 0)
|
|
{
|
|
LOGIF(LE, (skygw_log_write(
|
|
LOGFILE_ERROR,
|
|
"Warning : Couldn't connect to any of the %d "
|
|
"slaves. Routing to %s only.",
|
|
slaves_found,
|
|
(is_synced_master ? "Galera nodes" : "Master"))));
|
|
|
|
LOGIF(LM, (skygw_log_write(
|
|
LOGFILE_MESSAGE,
|
|
"* Warning : Couldn't connect to any of the %d "
|
|
"slaves. Routing to %s only.",
|
|
slaves_found,
|
|
(is_synced_master ? "Galera nodes" : "Master"))));
|
|
}
|
|
else if (slaves_found == 0)
|
|
{
|
|
LOGIF(LE, (skygw_log_write(
|
|
LOGFILE_ERROR,
|
|
"Warning : Couldn't find any slaves from existing "
|
|
"%d servers. Routing to %s only.",
|
|
router_nservers,
|
|
(is_synced_master ? "Galera nodes" : "Master"))));
|
|
|
|
LOGIF(LM, (skygw_log_write(
|
|
LOGFILE_MESSAGE,
|
|
"* Warning : Couldn't find any slaves from existing "
|
|
"%d servers. Routing to %s only.",
|
|
router_nservers,
|
|
(is_synced_master ? "Galera nodes" : "Master"))));
|
|
}
|
|
else if (slaves_connected < max_nslaves)
|
|
{
|
|
LOGIF(LT, (skygw_log_write_flush(
|
|
LOGFILE_TRACE,
|
|
"Note : Couldn't connect to maximum number of "
|
|
"slaves. Connected successfully to %d slaves "
|
|
"of %d of them.",
|
|
slaves_connected,
|
|
slaves_found)));
|
|
}
|
|
|
|
if (LOG_IS_ENABLED(LT))
|
|
{
|
|
for (i=0; i<router_nservers; i++)
|
|
{
|
|
BACKEND* b = backend_ref[i].bref_backend;
|
|
|
|
if (BREF_IS_IN_USE((&backend_ref[i])))
|
|
{
|
|
backend_type_t btype = BACKEND_TYPE(b);
|
|
|
|
LOGIF(LT, (skygw_log_write(
|
|
LOGFILE_TRACE,
|
|
"Selected %s in \t%s:%d",
|
|
(btype == BE_MASTER ? "master" :
|
|
(btype == BE_SLAVE ? "slave" :
|
|
"unknown node type")),
|
|
b->backend_server->name,
|
|
b->backend_server->port)));
|
|
}
|
|
} /* for */
|
|
}
|
|
}
|
|
/**
|
|
* Failure cases
|
|
*/
|
|
else
|
|
{
|
|
succp = false;
|
|
|
|
if (!master_found)
|
|
{
|
|
LOGIF(LE, (skygw_log_write(
|
|
LOGFILE_ERROR,
|
|
"Error : Couldn't find suitable %s from %d "
|
|
"candidates.",
|
|
(is_synced_master ? "Galera node" : "Master"),
|
|
router_nservers)));
|
|
|
|
LOGIF(LM, (skygw_log_write(
|
|
LOGFILE_MESSAGE,
|
|
"Error : Couldn't find suitable %s from %d "
|
|
"candidates.",
|
|
(is_synced_master ? "Galera node" : "Master"),
|
|
router_nservers)));
|
|
|
|
LOGIF(LT, (skygw_log_write(
|
|
LOGFILE_TRACE,
|
|
"Error : Couldn't find suitable %s from %d "
|
|
"candidates.",
|
|
(is_synced_master ? "Galera node" : "Master"),
|
|
router_nservers)));
|
|
}
|
|
else if (!master_connected)
|
|
{
|
|
LOGIF(LE, (skygw_log_write(
|
|
LOGFILE_ERROR,
|
|
"Error : Couldn't connect to any %s although "
|
|
"there exists at least one %s node in the "
|
|
"cluster.",
|
|
(is_synced_master ? "Galera node" : "Master"),
|
|
(is_synced_master ? "Galera node" : "Master"))));
|
|
|
|
LOGIF(LM, (skygw_log_write(
|
|
LOGFILE_MESSAGE,
|
|
"Error : Couldn't connect to any %s although "
|
|
"there exists at least one %s node in the "
|
|
"cluster.",
|
|
(is_synced_master ? "Galera node" : "Master"),
|
|
(is_synced_master ? "Galera node" : "Master"))));
|
|
|
|
LOGIF(LT, (skygw_log_write(
|
|
LOGFILE_TRACE,
|
|
"Error : Couldn't connect to any %s although "
|
|
"there exists at least one %s node in the "
|
|
"cluster.",
|
|
(is_synced_master ? "Galera node" : "Master"),
|
|
(is_synced_master ? "Galera node" : "Master"))));
|
|
}
|
|
|
|
if (slaves_connected < min_nslaves)
|
|
{
|
|
LOGIF(LE, (skygw_log_write(
|
|
LOGFILE_ERROR,
|
|
"Error : Couldn't establish required amount of "
|
|
"slave connections for router session.")));
|
|
|
|
LOGIF(LM, (skygw_log_write(
|
|
LOGFILE_MESSAGE,
|
|
"Error : Couldn't establish required amount of "
|
|
"slave connections for router session.")));
|
|
}
|
|
|
|
/** Clean up connections */
|
|
for (i=0; i<router_nservers; i++)
|
|
{
|
|
if (BREF_IS_IN_USE((&backend_ref[i])))
|
|
{
|
|
ss_dassert(backend_ref[i].bref_backend->backend_conn_count > 0);
|
|
|
|
/** disconnect opened connections */
|
|
dcb_close(backend_ref[i].bref_dcb);
|
|
bref_clear_state(&backend_ref[i], BREF_IN_USE);
|
|
/** Decrease backend's connection counter. */
|
|
atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1);
|
|
}
|
|
}
|
|
master_connected = false;
|
|
slaves_connected = 0;
|
|
}
|
|
return_succp:
|
|
|
|
return succp;
|
|
}
|
|
|
|
|
|
/**
|
|
* Create a generic router session property strcture.
|
|
*/
|
|
static rses_property_t* rses_property_init(
|
|
rses_property_type_t prop_type)
|
|
{
|
|
rses_property_t* prop;
|
|
|
|
prop = (rses_property_t*)calloc(1, sizeof(rses_property_t));
|
|
if (prop == NULL)
|
|
{
|
|
goto return_prop;
|
|
}
|
|
prop->rses_prop_type = prop_type;
|
|
#if defined(SS_DEBUG)
|
|
prop->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY;
|
|
prop->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY;
|
|
#endif
|
|
|
|
return_prop:
|
|
CHK_RSES_PROP(prop);
|
|
return prop;
|
|
}
|
|
|
|
/**
|
|
* Property is freed at the end of router client session.
|
|
*/
|
|
static void rses_property_done(
|
|
rses_property_t* prop)
|
|
{
|
|
CHK_RSES_PROP(prop);
|
|
|
|
switch (prop->rses_prop_type) {
|
|
case RSES_PROP_TYPE_SESCMD:
|
|
mysql_sescmd_done(&prop->rses_prop_data.sescmd);
|
|
break;
|
|
default:
|
|
LOGIF(LD, (skygw_log_write(
|
|
LOGFILE_DEBUG,
|
|
"%lu [rses_property_done] Unknown property type %d "
|
|
"in property %p",
|
|
pthread_self(),
|
|
prop->rses_prop_type,
|
|
prop)));
|
|
|
|
ss_dassert(false);
|
|
break;
|
|
}
|
|
free(prop);
|
|
}
|
|
|
|
/**
|
|
* Add property to the router_client_ses structure's rses_properties
|
|
* array. The slot is determined by the type of property.
|
|
* In each slot there is a list of properties of similar type.
|
|
*
|
|
* Router client session must be locked.
|
|
*/
|
|
static void rses_property_add(
|
|
ROUTER_CLIENT_SES* rses,
|
|
rses_property_t* prop)
|
|
{
|
|
rses_property_t* p;
|
|
|
|
CHK_CLIENT_RSES(rses);
|
|
CHK_RSES_PROP(prop);
|
|
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
|
|
|
|
prop->rses_prop_rsession = rses;
|
|
p = rses->rses_properties[prop->rses_prop_type];
|
|
|
|
if (p == NULL)
|
|
{
|
|
rses->rses_properties[prop->rses_prop_type] = prop;
|
|
}
|
|
else
|
|
{
|
|
while (p->rses_prop_next != NULL)
|
|
{
|
|
p = p->rses_prop_next;
|
|
}
|
|
p->rses_prop_next = prop;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Router session must be locked.
|
|
* Return session command pointer if succeed, NULL if failed.
|
|
*/
|
|
static mysql_sescmd_t* rses_property_get_sescmd(
|
|
rses_property_t* prop)
|
|
{
|
|
mysql_sescmd_t* sescmd;
|
|
|
|
CHK_RSES_PROP(prop);
|
|
ss_dassert(prop->rses_prop_rsession == NULL ||
|
|
SPINLOCK_IS_LOCKED(&prop->rses_prop_rsession->rses_lock));
|
|
|
|
sescmd = &prop->rses_prop_data.sescmd;
|
|
|
|
if (sescmd != NULL)
|
|
{
|
|
CHK_MYSQL_SESCMD(sescmd);
|
|
}
|
|
return sescmd;
|
|
}
|
|
|
|
/**
|
|
static void rses_begin_locked_property_action(
|
|
rses_property_t* prop)
|
|
{
|
|
CHK_RSES_PROP(prop);
|
|
spinlock_acquire(&prop->rses_prop_lock);
|
|
}
|
|
|
|
static void rses_end_locked_property_action(
|
|
rses_property_t* prop)
|
|
{
|
|
CHK_RSES_PROP(prop);
|
|
spinlock_release(&prop->rses_prop_lock);
|
|
}
|
|
*/
|
|
|
|
/**
|
|
* Create session command property.
|
|
*/
|
|
static mysql_sescmd_t* mysql_sescmd_init (
|
|
rses_property_t* rses_prop,
|
|
GWBUF* sescmd_buf,
|
|
unsigned char packet_type,
|
|
ROUTER_CLIENT_SES* rses)
|
|
{
|
|
mysql_sescmd_t* sescmd;
|
|
|
|
CHK_RSES_PROP(rses_prop);
|
|
/** Can't call rses_property_get_sescmd with uninitialized sescmd */
|
|
sescmd = &rses_prop->rses_prop_data.sescmd;
|
|
sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */
|
|
#if defined(SS_DEBUG)
|
|
sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD;
|
|
sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD;
|
|
#endif
|
|
/** Set session command buffer */
|
|
sescmd->my_sescmd_buf = sescmd_buf;
|
|
sescmd->my_sescmd_packet_type = packet_type;
|
|
|
|
return sescmd;
|
|
}
|
|
|
|
|
|
static void mysql_sescmd_done(
|
|
mysql_sescmd_t* sescmd)
|
|
{
|
|
CHK_RSES_PROP(sescmd->my_sescmd_prop);
|
|
gwbuf_free(sescmd->my_sescmd_buf);
|
|
memset(sescmd, 0, sizeof(mysql_sescmd_t));
|
|
}
|
|
|
|
/**
|
|
* All cases where backend message starts at least with one response to session
|
|
* command are handled here.
|
|
* Read session commands from property list. If command is already replied,
|
|
* discard packet. Else send reply to client. In both cases move cursor forward
|
|
* until all session command replies are handled.
|
|
*
|
|
* Cases that are expected to happen and which are handled:
|
|
* s = response not yet replied to client, S = already replied response,
|
|
* q = query
|
|
* 1. q+ for example : select * from mysql.user
|
|
* 2. s+ for example : set autocommit=1
|
|
* 3. S+
|
|
* 4. sq+
|
|
* 5. Sq+
|
|
* 6. Ss+
|
|
* 7. Ss+q+
|
|
* 8. S+q+
|
|
* 9. s+q+
|
|
*/
|
|
static GWBUF* sescmd_cursor_process_replies(
|
|
GWBUF* replybuf,
|
|
backend_ref_t* bref)
|
|
{
|
|
mysql_sescmd_t* scmd;
|
|
sescmd_cursor_t* scur;
|
|
|
|
scur = &bref->bref_sescmd_cur;
|
|
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
|
|
scmd = sescmd_cursor_get_command(scur);
|
|
|
|
CHK_GWBUF(replybuf);
|
|
|
|
/**
|
|
* Walk through packets in the message and the list of session
|
|
* commands.
|
|
*/
|
|
while (scmd != NULL && replybuf != NULL)
|
|
{
|
|
/** Faster backend has already responded to client : discard */
|
|
if (scmd->my_sescmd_is_replied)
|
|
{
|
|
bool last_packet = false;
|
|
|
|
CHK_GWBUF(replybuf);
|
|
|
|
while (!last_packet)
|
|
{
|
|
int buflen;
|
|
|
|
buflen = GWBUF_LENGTH(replybuf);
|
|
last_packet = GWBUF_IS_TYPE_RESPONSE_END(replybuf);
|
|
/** discard packet */
|
|
replybuf = gwbuf_consume(replybuf, buflen);
|
|
}
|
|
/** Set response status received */
|
|
bref_clear_state(bref, BREF_WAITING_RESULT);
|
|
}
|
|
/** Response is in the buffer and it will be sent to client. */
|
|
else if (replybuf != NULL)
|
|
{
|
|
/** Mark the rest session commands as replied */
|
|
scmd->my_sescmd_is_replied = true;
|
|
}
|
|
|
|
if (sescmd_cursor_next(scur))
|
|
{
|
|
scmd = sescmd_cursor_get_command(scur);
|
|
}
|
|
else
|
|
{
|
|
scmd = NULL;
|
|
/** All session commands are replied */
|
|
scur->scmd_cur_active = false;
|
|
}
|
|
}
|
|
ss_dassert(replybuf == NULL || *scur->scmd_cur_ptr_property == NULL);
|
|
|
|
return replybuf;
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
* Get the address of current session command.
|
|
*
|
|
* Router session must be locked */
|
|
static mysql_sescmd_t* sescmd_cursor_get_command(
|
|
sescmd_cursor_t* scur)
|
|
{
|
|
mysql_sescmd_t* scmd;
|
|
|
|
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
|
|
scur->scmd_cur_cmd = rses_property_get_sescmd(*scur->scmd_cur_ptr_property);
|
|
|
|
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
|
|
|
|
scmd = scur->scmd_cur_cmd;
|
|
|
|
return scmd;
|
|
}
|
|
|
|
/** router must be locked */
|
|
static bool sescmd_cursor_is_active(
|
|
sescmd_cursor_t* sescmd_cursor)
|
|
{
|
|
bool succp;
|
|
ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock));
|
|
|
|
succp = sescmd_cursor->scmd_cur_active;
|
|
return succp;
|
|
}
|
|
|
|
/** router must be locked */
|
|
static void sescmd_cursor_set_active(
|
|
sescmd_cursor_t* sescmd_cursor,
|
|
bool value)
|
|
{
|
|
ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock));
|
|
/** avoid calling unnecessarily */
|
|
ss_dassert(sescmd_cursor->scmd_cur_active != value);
|
|
sescmd_cursor->scmd_cur_active = value;
|
|
}
|
|
|
|
/**
|
|
* Clone session command's command buffer.
|
|
* Router session must be locked
|
|
*/
|
|
static GWBUF* sescmd_cursor_clone_querybuf(
|
|
sescmd_cursor_t* scur)
|
|
{
|
|
GWBUF* buf;
|
|
ss_dassert(scur->scmd_cur_cmd != NULL);
|
|
|
|
buf = gwbuf_clone(scur->scmd_cur_cmd->my_sescmd_buf);
|
|
|
|
CHK_GWBUF(buf);
|
|
return buf;
|
|
}
|
|
|
|
static bool sescmd_cursor_history_empty(
|
|
sescmd_cursor_t* scur)
|
|
{
|
|
bool succp;
|
|
|
|
CHK_SESCMD_CUR(scur);
|
|
|
|
if (scur->scmd_cur_rses->rses_properties[RSES_PROP_TYPE_SESCMD] == NULL)
|
|
{
|
|
succp = true;
|
|
}
|
|
else
|
|
{
|
|
succp = false;
|
|
}
|
|
|
|
return succp;
|
|
}
|
|
|
|
|
|
static void sescmd_cursor_reset(
|
|
sescmd_cursor_t* scur)
|
|
{
|
|
ROUTER_CLIENT_SES* rses;
|
|
CHK_SESCMD_CUR(scur);
|
|
CHK_CLIENT_RSES(scur->scmd_cur_rses);
|
|
rses = scur->scmd_cur_rses;
|
|
|
|
scur->scmd_cur_ptr_property = &rses->rses_properties[RSES_PROP_TYPE_SESCMD];
|
|
|
|
CHK_RSES_PROP((*scur->scmd_cur_ptr_property));
|
|
scur->scmd_cur_active = false;
|
|
scur->scmd_cur_cmd = &(*scur->scmd_cur_ptr_property)->rses_prop_data.sescmd;
|
|
}
|
|
|
|
static bool execute_sescmd_history(
|
|
backend_ref_t* bref)
|
|
{
|
|
bool succp;
|
|
sescmd_cursor_t* scur;
|
|
CHK_BACKEND_REF(bref);
|
|
|
|
scur = &bref->bref_sescmd_cur;
|
|
CHK_SESCMD_CUR(scur);
|
|
|
|
if (!sescmd_cursor_history_empty(scur))
|
|
{
|
|
sescmd_cursor_reset(scur);
|
|
succp = execute_sescmd_in_backend(bref);
|
|
}
|
|
else
|
|
{
|
|
succp = true;
|
|
}
|
|
|
|
return succp;
|
|
}
|
|
|
|
/**
|
|
* If session command cursor is passive, sends the command to backend for
|
|
* execution.
|
|
*
|
|
* Returns true if command was sent or added successfully to the queue.
|
|
* Returns false if command sending failed or if there are no pending session
|
|
* commands.
|
|
*
|
|
* Router session must be locked.
|
|
*/
|
|
static bool execute_sescmd_in_backend(
|
|
backend_ref_t* backend_ref)
|
|
{
|
|
DCB* dcb;
|
|
bool succp;
|
|
int rc = 0;
|
|
sescmd_cursor_t* scur;
|
|
|
|
if (BREF_IS_CLOSED(backend_ref))
|
|
{
|
|
succp = false;
|
|
goto return_succp;
|
|
}
|
|
dcb = backend_ref->bref_dcb;
|
|
|
|
CHK_DCB(dcb);
|
|
CHK_BACKEND_REF(backend_ref);
|
|
|
|
/**
|
|
* Get cursor pointer and copy of command buffer to cursor.
|
|
*/
|
|
scur = &backend_ref->bref_sescmd_cur;
|
|
|
|
/** Return if there are no pending ses commands */
|
|
if (sescmd_cursor_get_command(scur) == NULL)
|
|
{
|
|
succp = false;
|
|
LOGIF(LT, (skygw_log_write_flush(
|
|
LOGFILE_TRACE,
|
|
"Cursor had no pending session commands.")));
|
|
|
|
goto return_succp;
|
|
}
|
|
|
|
if (!sescmd_cursor_is_active(scur))
|
|
{
|
|
/** Cursor is left active when function returns. */
|
|
sescmd_cursor_set_active(scur, true);
|
|
}
|
|
#if defined(SS_DEBUG)
|
|
LOGIF(LT, tracelog_routed_query(scur->scmd_cur_rses,
|
|
"execute_sescmd_in_backend",
|
|
backend_ref,
|
|
sescmd_cursor_clone_querybuf(scur)));
|
|
|
|
{
|
|
GWBUF* tmpbuf = sescmd_cursor_clone_querybuf(scur);
|
|
uint8_t* ptr = GWBUF_DATA(tmpbuf);
|
|
unsigned char cmd = MYSQL_GET_COMMAND(ptr);
|
|
|
|
LOGIF(LD, (skygw_log_write(
|
|
LOGFILE_DEBUG,
|
|
"%lu [execute_sescmd_in_backend] Just before write, fd "
|
|
"%d : cmd %s.",
|
|
pthread_self(),
|
|
dcb->fd,
|
|
STRPACKETTYPE(cmd))));
|
|
gwbuf_free(tmpbuf);
|
|
}
|
|
#endif /*< SS_DEBUG */
|
|
switch (scur->scmd_cur_cmd->my_sescmd_packet_type) {
|
|
case MYSQL_COM_CHANGE_USER:
|
|
rc = dcb->func.auth(
|
|
dcb,
|
|
NULL,
|
|
dcb->session,
|
|
sescmd_cursor_clone_querybuf(scur));
|
|
break;
|
|
|
|
case MYSQL_COM_QUERY:
|
|
case MYSQL_COM_INIT_DB:
|
|
default:
|
|
/**
|
|
* Mark session command buffer, it triggers writing
|
|
* MySQL command to protocol
|
|
*/
|
|
gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD);
|
|
rc = dcb->func.write(
|
|
dcb,
|
|
sescmd_cursor_clone_querybuf(scur));
|
|
break;
|
|
}
|
|
LOGIF(LT, (skygw_log_write_flush(
|
|
LOGFILE_TRACE,
|
|
"%lu [execute_sescmd_in_backend] Routed %s cmd %p.",
|
|
pthread_self(),
|
|
STRPACKETTYPE(scur->scmd_cur_cmd->my_sescmd_packet_type),
|
|
scur->scmd_cur_cmd)));
|
|
|
|
if (rc == 1)
|
|
{
|
|
succp = true;
|
|
}
|
|
else
|
|
{
|
|
succp = false;
|
|
}
|
|
return_succp:
|
|
return succp;
|
|
}
|
|
|
|
|
|
/**
|
|
* Moves cursor to next property and copied address of its sescmd to cursor.
|
|
* Current propery must be non-null.
|
|
* If current property is the last on the list, *scur->scmd_ptr_property == NULL
|
|
*
|
|
* Router session must be locked
|
|
*/
|
|
static bool sescmd_cursor_next(
|
|
sescmd_cursor_t* scur)
|
|
{
|
|
bool succp = false;
|
|
rses_property_t* prop_curr;
|
|
rses_property_t* prop_next;
|
|
|
|
ss_dassert(scur != NULL);
|
|
ss_dassert(*(scur->scmd_cur_ptr_property) != NULL);
|
|
ss_dassert(SPINLOCK_IS_LOCKED(
|
|
&(*(scur->scmd_cur_ptr_property))->rses_prop_rsession->rses_lock));
|
|
|
|
/** Illegal situation */
|
|
if (scur == NULL ||
|
|
*scur->scmd_cur_ptr_property == NULL ||
|
|
scur->scmd_cur_cmd == NULL)
|
|
{
|
|
/** Log error */
|
|
goto return_succp;
|
|
}
|
|
prop_curr = *(scur->scmd_cur_ptr_property);
|
|
|
|
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
|
|
ss_dassert(prop_curr == mysql_sescmd_get_property(scur->scmd_cur_cmd));
|
|
CHK_RSES_PROP(prop_curr);
|
|
|
|
/** Copy address of pointer to next property */
|
|
scur->scmd_cur_ptr_property = &(prop_curr->rses_prop_next);
|
|
prop_next = *scur->scmd_cur_ptr_property;
|
|
ss_dassert(prop_next == *(scur->scmd_cur_ptr_property));
|
|
|
|
|
|
/** If there is a next property move forward */
|
|
if (prop_next != NULL)
|
|
{
|
|
CHK_RSES_PROP(prop_next);
|
|
CHK_RSES_PROP((*(scur->scmd_cur_ptr_property)));
|
|
|
|
/** Get pointer to next property's sescmd */
|
|
scur->scmd_cur_cmd = rses_property_get_sescmd(prop_next);
|
|
|
|
ss_dassert(prop_next == scur->scmd_cur_cmd->my_sescmd_prop);
|
|
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
|
|
CHK_RSES_PROP(scur->scmd_cur_cmd->my_sescmd_prop);
|
|
}
|
|
else
|
|
{
|
|
/** No more properties, can't proceed. */
|
|
goto return_succp;
|
|
}
|
|
|
|
if (scur->scmd_cur_cmd != NULL)
|
|
{
|
|
succp = true;
|
|
}
|
|
else
|
|
{
|
|
ss_dassert(false); /*< Log error, sescmd shouldn't be NULL */
|
|
}
|
|
return_succp:
|
|
return succp;
|
|
}
|
|
|
|
static rses_property_t* mysql_sescmd_get_property(
|
|
mysql_sescmd_t* scmd)
|
|
{
|
|
CHK_MYSQL_SESCMD(scmd);
|
|
return scmd->my_sescmd_prop;
|
|
}
|
|
|
|
static void tracelog_routed_query(
|
|
ROUTER_CLIENT_SES* rses,
|
|
char* funcname,
|
|
backend_ref_t* bref,
|
|
GWBUF* buf)
|
|
{
|
|
uint8_t* packet = GWBUF_DATA(buf);
|
|
unsigned char packet_type = packet[4];
|
|
size_t len;
|
|
size_t buflen = GWBUF_LENGTH(buf);
|
|
char* querystr;
|
|
char* startpos = (char *)&packet[5];
|
|
BACKEND* b;
|
|
backend_type_t be_type;
|
|
DCB* dcb;
|
|
|
|
CHK_BACKEND_REF(bref);
|
|
b = bref->bref_backend;
|
|
CHK_BACKEND(b);
|
|
dcb = bref->bref_dcb;
|
|
CHK_DCB(dcb);
|
|
|
|
be_type = BACKEND_TYPE(b);
|
|
|
|
if (GWBUF_IS_TYPE_MYSQL(buf))
|
|
{
|
|
len = packet[0];
|
|
len += 256*packet[1];
|
|
len += 256*256*packet[2];
|
|
|
|
if (packet_type == '\x03')
|
|
{
|
|
querystr = (char *)malloc(len);
|
|
memcpy(querystr, startpos, len-1);
|
|
querystr[len-1] = '\0';
|
|
LOGIF(LT, (skygw_log_write_flush(
|
|
LOGFILE_TRACE,
|
|
"%lu [%s] %d bytes long buf, \"%s\" -> %s:%d %s dcb %p",
|
|
pthread_self(),
|
|
funcname,
|
|
buflen,
|
|
querystr,
|
|
b->backend_server->name,
|
|
b->backend_server->port,
|
|
STRBETYPE(be_type),
|
|
dcb)));
|
|
free(querystr);
|
|
}
|
|
else if (packet_type == '\x22' ||
|
|
packet_type == 0x22 ||
|
|
packet_type == '\x26' ||
|
|
packet_type == 0x26 ||
|
|
true)
|
|
{
|
|
querystr = (char *)malloc(len);
|
|
memcpy(querystr, startpos, len-1);
|
|
querystr[len-1] = '\0';
|
|
LOGIF(LT, (skygw_log_write_flush(
|
|
LOGFILE_TRACE,
|
|
"%lu [%s] %d bytes long buf, \"%s\" -> %s:%d %s dcb %p",
|
|
pthread_self(),
|
|
funcname,
|
|
buflen,
|
|
querystr,
|
|
b->backend_server->name,
|
|
b->backend_server->port,
|
|
STRBETYPE(be_type),
|
|
dcb)));
|
|
free(querystr);
|
|
}
|
|
}
|
|
gwbuf_free(buf);
|
|
}
|
|
|
|
|
|
/**
|
|
* Return rc, rc < 0 if router session is closed. rc == 0 if there are no
|
|
* capabilities specified, rc > 0 when there are capabilities.
|
|
*/
|
|
static uint8_t getCapabilities (
|
|
ROUTER* inst,
|
|
void* router_session)
|
|
{
|
|
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session;
|
|
uint8_t rc;
|
|
|
|
if (!rses_begin_locked_router_action(rses))
|
|
{
|
|
rc = 0xff;
|
|
goto return_rc;
|
|
}
|
|
rc = rses->rses_capabilities;
|
|
|
|
rses_end_locked_router_action(rses);
|
|
|
|
return_rc:
|
|
return rc;
|
|
}
|
|
|
|
/**
|
|
* Execute in backends used by current router session.
|
|
* Save session variable commands to router session property
|
|
* struct. Thus, they can be replayed in backends which are
|
|
* started and joined later.
|
|
*
|
|
* Suppress redundant OK packets sent by backends.
|
|
*
|
|
* The first OK packet is replied to the client.
|
|
* Return true if succeed, false is returned if router session was closed or
|
|
* if execute_sescmd_in_backend failed.
|
|
*/
|
|
static bool route_session_write(
|
|
ROUTER_CLIENT_SES* router_cli_ses,
|
|
GWBUF* querybuf,
|
|
ROUTER_INSTANCE* inst,
|
|
unsigned char packet_type,
|
|
skygw_query_type_t qtype)
|
|
{
|
|
bool succp;
|
|
rses_property_t* prop;
|
|
backend_ref_t* backend_ref;
|
|
int i;
|
|
|
|
LOGIF(LT, (skygw_log_write(
|
|
LOGFILE_TRACE,
|
|
"Session write, query type\t%s, packet type %s, "
|
|
"routing to all servers.",
|
|
STRQTYPE(qtype),
|
|
STRPACKETTYPE(packet_type))));
|
|
|
|
backend_ref = router_cli_ses->rses_backend_ref;
|
|
|
|
/**
|
|
* These are one-way messages and server doesn't respond to them.
|
|
* Therefore reply processing is unnecessary and session
|
|
* command property is not needed. It is just routed to all available
|
|
* backends.
|
|
*/
|
|
if (packet_type == MYSQL_COM_STMT_SEND_LONG_DATA ||
|
|
packet_type == MYSQL_COM_QUIT ||
|
|
packet_type == MYSQL_COM_STMT_CLOSE)
|
|
{
|
|
int rc;
|
|
|
|
succp = true;
|
|
|
|
/** Lock router session */
|
|
if (!rses_begin_locked_router_action(router_cli_ses))
|
|
{
|
|
succp = false;
|
|
goto return_succp;
|
|
}
|
|
|
|
for (i=0; i<router_cli_ses->rses_nbackends; i++)
|
|
{
|
|
DCB* dcb = backend_ref[i].bref_dcb;
|
|
|
|
if (BREF_IS_IN_USE((&backend_ref[i])))
|
|
{
|
|
rc = dcb->func.write(dcb, gwbuf_clone(querybuf));
|
|
|
|
if (rc != 1)
|
|
{
|
|
succp = false;
|
|
}
|
|
}
|
|
}
|
|
rses_end_locked_router_action(router_cli_ses);
|
|
gwbuf_free(querybuf);
|
|
goto return_succp;
|
|
}
|
|
/** Lock router session */
|
|
if (!rses_begin_locked_router_action(router_cli_ses))
|
|
{
|
|
rses_property_done(prop);
|
|
succp = false;
|
|
goto return_succp;
|
|
}
|
|
/**
|
|
* Additional reference is created to querybuf to
|
|
* prevent it from being released before properties
|
|
* are cleaned up as a part of router sessionclean-up.
|
|
*/
|
|
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
|
|
mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses);
|
|
|
|
/** Add sescmd property to router client session */
|
|
rses_property_add(router_cli_ses, prop);
|
|
|
|
for (i=0; i<router_cli_ses->rses_nbackends; i++)
|
|
{
|
|
if (BREF_IS_IN_USE((&backend_ref[i])))
|
|
{
|
|
sescmd_cursor_t* scur;
|
|
|
|
scur = backend_ref_get_sescmd_cursor(&backend_ref[i]);
|
|
|
|
/**
|
|
* Add one waiter to backend reference.
|
|
*/
|
|
bref_set_state(get_bref_from_dcb(router_cli_ses,
|
|
backend_ref[i].bref_dcb),
|
|
BREF_WAITING_RESULT);
|
|
/**
|
|
* Start execution if cursor is not already executing.
|
|
* Otherwise, cursor will execute pending commands
|
|
* when it completes with previous commands.
|
|
*/
|
|
if (sescmd_cursor_is_active(scur))
|
|
{
|
|
succp = true;
|
|
|
|
LOGIF(LT, (skygw_log_write(
|
|
LOGFILE_TRACE,
|
|
"Backend %s:%d already executing sescmd.",
|
|
backend_ref[i].bref_backend->backend_server->name,
|
|
backend_ref[i].bref_backend->backend_server->port)));
|
|
}
|
|
else
|
|
{
|
|
succp = execute_sescmd_in_backend(&backend_ref[i]);
|
|
|
|
if (!succp)
|
|
{
|
|
LOGIF(LE, (skygw_log_write_flush(
|
|
LOGFILE_ERROR,
|
|
"Error : Failed to execute session "
|
|
"command in %s:%d",
|
|
backend_ref[i].bref_backend->backend_server->name,
|
|
backend_ref[i].bref_backend->backend_server->port)));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
/** Unlock router session */
|
|
rses_end_locked_router_action(router_cli_ses);
|
|
|
|
atomic_add(&inst->stats.n_all, 1);
|
|
|
|
return_succp:
|
|
return succp;
|
|
}
|
|
|
|
#if defined(NOT_USED)
|
|
static bool router_option_configured(
|
|
ROUTER_INSTANCE* router,
|
|
const char* optionstr,
|
|
void* data)
|
|
{
|
|
bool succp = false;
|
|
char** option;
|
|
|
|
option = router->service->routerOptions;
|
|
|
|
while (option != NULL)
|
|
{
|
|
char* value;
|
|
|
|
if ((value = strchr(options[i], '=')) == NULL)
|
|
{
|
|
break;
|
|
}
|
|
else
|
|
{
|
|
*value = 0;
|
|
value++;
|
|
if (strcmp(options[i], "slave_selection_criteria") == 0)
|
|
{
|
|
if (GET_SELECT_CRITERIA(value) == (select_criteria_t *)*data)
|
|
{
|
|
succp = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return succp;
|
|
}
|
|
#endif /*< NOT_USED */
|
|
|
|
static void rwsplit_process_router_options(
|
|
ROUTER_INSTANCE* router,
|
|
char** options)
|
|
{
|
|
int i;
|
|
char* value;
|
|
select_criteria_t c;
|
|
|
|
for (i = 0; options[i]; i++)
|
|
{
|
|
if ((value = strchr(options[i], '=')) == NULL)
|
|
{
|
|
LOGIF(LE, (skygw_log_write(
|
|
LOGFILE_ERROR, "Warning : Unsupported "
|
|
"router option \"%s\" for "
|
|
"readwritesplit router.",
|
|
options[i])));
|
|
}
|
|
else
|
|
{
|
|
*value = 0;
|
|
value++;
|
|
if (strcmp(options[i], "slave_selection_criteria") == 0)
|
|
{
|
|
c = GET_SELECT_CRITERIA(value);
|
|
ss_dassert(
|
|
c == LEAST_GLOBAL_CONNECTIONS ||
|
|
c == LEAST_ROUTER_CONNECTIONS ||
|
|
c == LEAST_BEHIND_MASTER ||
|
|
c == LEAST_CURRENT_OPERATIONS ||
|
|
c == UNDEFINED_CRITERIA);
|
|
|
|
if (c == UNDEFINED_CRITERIA)
|
|
{
|
|
LOGIF(LE, (skygw_log_write(
|
|
LOGFILE_ERROR, "Warning : Unknown "
|
|
"slave selection criteria \"%s\". "
|
|
"Allowed values are LEAST_GLOBAL_CONNECTIONS, "
|
|
"LEAST_ROUTER_CONNECTIONS, "
|
|
"LEAST_BEHIND_MASTER,"
|
|
"and LEAST_CURRENT_OPERATIONS.",
|
|
STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria))));
|
|
}
|
|
else
|
|
{
|
|
router->rwsplit_config.rw_slave_select_criteria = c;
|
|
}
|
|
}
|
|
}
|
|
} /*< for */
|
|
}
|
|
|
|
/**
|
|
* Error Handler routine to resolve backend failures. If it succeeds then there
|
|
* are enough operative backends available and connected. Otherwise it fails,
|
|
* and session is terminated.
|
|
*
|
|
* @param instance The router instance
|
|
* @param router_session The router session
|
|
* @param message The error message to reply
|
|
* @param backend_dcb The backend DCB
|
|
* @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION
|
|
* @param succp Result of action.
|
|
*
|
|
* Even if succp == true connecting to new slave may have failed. succp is to
|
|
* tell whether router has enough master/slave connections to continue work.
|
|
*/
|
|
|
|
static void handleError (
|
|
ROUTER* instance,
|
|
void* router_session,
|
|
GWBUF* errmsgbuf,
|
|
DCB* backend_dcb,
|
|
error_action_t action,
|
|
bool* succp)
|
|
{
|
|
SESSION* session;
|
|
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
|
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session;
|
|
|
|
CHK_DCB(backend_dcb);
|
|
#if defined(SS_DEBUG)
|
|
backend_dcb->dcb_errhandle_called = true;
|
|
#endif
|
|
session = backend_dcb->session;
|
|
|
|
if (session != NULL)
|
|
CHK_SESSION(session);
|
|
|
|
switch (action) {
|
|
case ERRACT_NEW_CONNECTION:
|
|
{
|
|
if (rses != NULL)
|
|
CHK_CLIENT_RSES(rses);
|
|
|
|
if (!rses_begin_locked_router_action(rses))
|
|
{
|
|
*succp = false;
|
|
return;
|
|
}
|
|
|
|
*succp = handle_error_new_connection(inst,
|
|
rses,
|
|
backend_dcb,
|
|
errmsgbuf);
|
|
rses_end_locked_router_action(rses);
|
|
break;
|
|
}
|
|
|
|
case ERRACT_REPLY_CLIENT:
|
|
{
|
|
*succp = handle_error_reply_client(session, errmsgbuf);
|
|
break;
|
|
}
|
|
|
|
default:
|
|
*succp = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
|
|
static bool handle_error_reply_client(
|
|
SESSION* ses,
|
|
GWBUF* errmsg)
|
|
{
|
|
session_state_t sesstate;
|
|
DCB* client_dcb;
|
|
bool succp;
|
|
|
|
spinlock_acquire(&ses->ses_lock);
|
|
sesstate = ses->state;
|
|
client_dcb = ses->client;
|
|
spinlock_release(&ses->ses_lock);
|
|
|
|
if (sesstate == SESSION_STATE_ROUTER_READY)
|
|
{
|
|
CHK_DCB(client_dcb);
|
|
client_dcb->func.write(client_dcb, errmsg);
|
|
}
|
|
succp = false; /** false because new servers aren's selected. */
|
|
|
|
return succp;
|
|
}
|
|
|
|
/**
|
|
* This must be called with router lock
|
|
*/
|
|
static bool handle_error_new_connection(
|
|
ROUTER_INSTANCE* inst,
|
|
ROUTER_CLIENT_SES* rses,
|
|
DCB* backend_dcb,
|
|
GWBUF* errmsg)
|
|
{
|
|
SESSION* ses;
|
|
int router_nservers;
|
|
int max_nslaves;
|
|
int max_slave_rlag;
|
|
backend_ref_t* bref;
|
|
bool succp;
|
|
|
|
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
|
|
|
|
ses = backend_dcb->session;
|
|
CHK_SESSION(ses);
|
|
|
|
bref = get_bref_from_dcb(rses, backend_dcb);
|
|
|
|
/** failed DCB has already been replaced */
|
|
if (bref == NULL)
|
|
{
|
|
succp = true;
|
|
goto return_succp;
|
|
}
|
|
/**
|
|
* Error handler is already called for this DCB because
|
|
* it's not polling anymore. It can be assumed that
|
|
* it succeed because rses isn't closed.
|
|
*/
|
|
if (backend_dcb->state != DCB_STATE_POLLING)
|
|
{
|
|
succp = true;
|
|
goto return_succp;
|
|
}
|
|
|
|
CHK_BACKEND_REF(bref);
|
|
|
|
if (BREF_IS_WAITING_RESULT(bref))
|
|
{
|
|
DCB* client_dcb;
|
|
client_dcb = ses->client;
|
|
client_dcb->func.write(client_dcb, errmsg);
|
|
bref_clear_state(bref, BREF_WAITING_RESULT);
|
|
}
|
|
bref_clear_state(bref, BREF_IN_USE);
|
|
bref_set_state(bref, BREF_CLOSED);
|
|
/**
|
|
* Remove callback because this DCB won't be used
|
|
* unless it is reconnected later, and then the callback
|
|
* is set again.
|
|
*/
|
|
dcb_remove_callback(backend_dcb,
|
|
DCB_REASON_NOT_RESPONDING,
|
|
&router_handle_state_switch,
|
|
(void *)bref);
|
|
|
|
router_nservers = router_get_servercount(inst);
|
|
max_nslaves = rses_get_max_slavecount(rses, router_nservers);
|
|
max_slave_rlag = rses_get_max_replication_lag(rses);
|
|
/**
|
|
* Try to get replacement slave or at least the minimum
|
|
* number of slave connections for router session.
|
|
*/
|
|
succp = select_connect_backend_servers(
|
|
&rses->rses_master_ref,
|
|
rses->rses_backend_ref,
|
|
router_nservers,
|
|
max_nslaves,
|
|
max_slave_rlag,
|
|
rses->rses_config.rw_slave_select_criteria,
|
|
ses,
|
|
inst);
|
|
|
|
return_succp:
|
|
return succp;
|
|
}
|
|
|
|
|
|
static void print_error_packet(
|
|
ROUTER_CLIENT_SES* rses,
|
|
GWBUF* buf,
|
|
DCB* dcb)
|
|
{
|
|
#if defined(SS_DEBUG)
|
|
if (GWBUF_IS_TYPE_MYSQL(buf))
|
|
{
|
|
while (gwbuf_length(buf) > 0)
|
|
{
|
|
/**
|
|
* This works with MySQL protocol only !
|
|
* Protocol specific packet print functions would be nice.
|
|
*/
|
|
uint8_t* ptr = GWBUF_DATA(buf);
|
|
size_t len = MYSQL_GET_PACKET_LEN(ptr);
|
|
|
|
if (MYSQL_GET_COMMAND(ptr) == 0xff)
|
|
{
|
|
SERVER* srv = NULL;
|
|
backend_ref_t* bref = rses->rses_backend_ref;
|
|
int i;
|
|
char* bufstr;
|
|
|
|
for (i=0; i<rses->rses_nbackends; i++)
|
|
{
|
|
if (bref[i].bref_dcb == dcb)
|
|
{
|
|
srv = bref[i].bref_backend->backend_server;
|
|
}
|
|
}
|
|
ss_dassert(srv != NULL);
|
|
|
|
bufstr = strndup(&ptr[7], len-3);
|
|
|
|
LOGIF(LE, (skygw_log_write_flush(
|
|
LOGFILE_ERROR,
|
|
"Error : Backend server %s:%d responded with "
|
|
"error : %s",
|
|
srv->name,
|
|
srv->port,
|
|
bufstr)));
|
|
free(bufstr);
|
|
}
|
|
buf = gwbuf_consume(buf, len+4);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
while ((buf = gwbuf_consume(buf, GWBUF_LENGTH(buf))) != NULL);
|
|
}
|
|
#endif /*< SS_DEBUG */
|
|
}
|
|
|
|
static int router_get_servercount(
|
|
ROUTER_INSTANCE* inst)
|
|
{
|
|
int router_nservers = 0;
|
|
BACKEND** b = inst->servers;
|
|
/** count servers */
|
|
while (*(b++) != NULL) router_nservers++;
|
|
|
|
return router_nservers;
|
|
}
|
|
|
|
static bool have_enough_servers(
|
|
ROUTER_CLIENT_SES** p_rses,
|
|
const int min_nsrv,
|
|
int router_nsrv,
|
|
ROUTER_INSTANCE* router)
|
|
{
|
|
bool succp;
|
|
|
|
/** With too few servers session is not created */
|
|
if (router_nsrv < min_nsrv ||
|
|
MAX((*p_rses)->rses_config.rw_max_slave_conn_count,
|
|
(router_nsrv*(*p_rses)->rses_config.rw_max_slave_conn_percent)/100)
|
|
< min_nsrv)
|
|
{
|
|
if (router_nsrv < min_nsrv)
|
|
{
|
|
LOGIF(LE, (skygw_log_write_flush(
|
|
LOGFILE_ERROR,
|
|
"Error : Unable to start %s service. There are "
|
|
"too few backend servers available. Found %d "
|
|
"when %d is required.",
|
|
router->service->name,
|
|
router_nsrv,
|
|
min_nsrv)));
|
|
}
|
|
else
|
|
{
|
|
double pct = (*p_rses)->rses_config.rw_max_slave_conn_percent/100;
|
|
double nservers = (double)router_nsrv*pct;
|
|
|
|
if ((*p_rses)->rses_config.rw_max_slave_conn_count < min_nsrv)
|
|
{
|
|
LOGIF(LE, (skygw_log_write_flush(
|
|
LOGFILE_ERROR,
|
|
"Error : Unable to start %s service. There are "
|
|
"too few backend servers configured in "
|
|
"MaxScale.cnf. Found %d when %d is required.",
|
|
router->service->name,
|
|
(*p_rses)->rses_config.rw_max_slave_conn_count,
|
|
min_nsrv)));
|
|
}
|
|
if (nservers < min_nsrv)
|
|
{
|
|
LOGIF(LE, (skygw_log_write_flush(
|
|
LOGFILE_ERROR,
|
|
"Error : Unable to start %s service. There are "
|
|
"too few backend servers configured in "
|
|
"MaxScale.cnf. Found %d%% when at least %.0f%% "
|
|
"would be required.",
|
|
router->service->name,
|
|
(*p_rses)->rses_config.rw_max_slave_conn_percent,
|
|
min_nsrv/(((double)router_nsrv)/100))));
|
|
}
|
|
}
|
|
free(*p_rses);
|
|
*p_rses = NULL;
|
|
succp = false;
|
|
}
|
|
else
|
|
{
|
|
succp = true;
|
|
}
|
|
return succp;
|
|
}
|
|
|
|
/**
|
|
* Find out the number of read backend servers.
|
|
* Depending on the configuration value type, either copy direct count
|
|
* of slave connections or calculate the count from percentage value.
|
|
*/
|
|
static int rses_get_max_slavecount(
|
|
ROUTER_CLIENT_SES* rses,
|
|
int router_nservers)
|
|
{
|
|
int conf_max_nslaves;
|
|
int max_nslaves;
|
|
|
|
CHK_CLIENT_RSES(rses);
|
|
|
|
if (rses->rses_config.rw_max_slave_conn_count > 0)
|
|
{
|
|
conf_max_nslaves = rses->rses_config.rw_max_slave_conn_count;
|
|
}
|
|
else
|
|
{
|
|
conf_max_nslaves =
|
|
(router_nservers*rses->rses_config.rw_max_slave_conn_percent)/100;
|
|
}
|
|
max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves));
|
|
|
|
return max_nslaves;
|
|
}
|
|
|
|
|
|
static int rses_get_max_replication_lag(
|
|
ROUTER_CLIENT_SES* rses)
|
|
{
|
|
int conf_max_rlag;
|
|
|
|
CHK_CLIENT_RSES(rses);
|
|
|
|
/** if there is no configured value, then longest possible int is used */
|
|
if (rses->rses_config.rw_max_slave_replication_lag > 0)
|
|
{
|
|
conf_max_rlag = rses->rses_config.rw_max_slave_replication_lag;
|
|
}
|
|
else
|
|
{
|
|
conf_max_rlag = ~(1<<31);
|
|
}
|
|
|
|
return conf_max_rlag;
|
|
}
|
|
|
|
|
|
static backend_ref_t* get_bref_from_dcb(
|
|
ROUTER_CLIENT_SES* rses,
|
|
DCB* dcb)
|
|
{
|
|
backend_ref_t* bref;
|
|
int i = 0;
|
|
CHK_DCB(dcb);
|
|
CHK_CLIENT_RSES(rses);
|
|
|
|
bref = rses->rses_backend_ref;
|
|
|
|
while (i<rses->rses_nbackends)
|
|
{
|
|
if (bref->bref_dcb == dcb)
|
|
{
|
|
break;
|
|
}
|
|
bref++;
|
|
i += 1;
|
|
}
|
|
|
|
if (i == rses->rses_nbackends)
|
|
{
|
|
bref = NULL;
|
|
}
|
|
return bref;
|
|
}
|
|
|
|
static int router_handle_state_switch(
|
|
DCB* dcb,
|
|
DCB_REASON reason,
|
|
void* data)
|
|
{
|
|
backend_ref_t* bref;
|
|
int rc = 1;
|
|
ROUTER_CLIENT_SES* rses;
|
|
SESSION* ses;
|
|
SERVER* srv;
|
|
|
|
CHK_DCB(dcb);
|
|
bref = (backend_ref_t *)data;
|
|
CHK_BACKEND_REF(bref);
|
|
|
|
srv = bref->bref_backend->backend_server;
|
|
|
|
if (SERVER_IS_RUNNING(srv) && SERVER_IS_IN_CLUSTER(srv))
|
|
{
|
|
goto return_rc;
|
|
}
|
|
ses = dcb->session;
|
|
CHK_SESSION(ses);
|
|
|
|
rses = (ROUTER_CLIENT_SES *)dcb->session->router_session;
|
|
CHK_CLIENT_RSES(rses);
|
|
|
|
switch (reason) {
|
|
case DCB_REASON_NOT_RESPONDING:
|
|
dcb->func.hangup(dcb);
|
|
break;
|
|
|
|
default:
|
|
break;
|
|
}
|
|
|
|
return_rc:
|
|
return rc;
|
|
}
|
|
|
|
static sescmd_cursor_t* backend_ref_get_sescmd_cursor (
|
|
backend_ref_t* bref)
|
|
{
|
|
sescmd_cursor_t* scur;
|
|
CHK_BACKEND_REF(bref);
|
|
|
|
scur = &bref->bref_sescmd_cur;
|
|
CHK_SESCMD_CUR(scur);
|
|
|
|
return scur;
|
|
}
|
|
|
|
#if defined(PREP_STMT_CACHING)
|
|
#define MAX_STMT_LEN 1024
|
|
|
|
static prep_stmt_t* prep_stmt_init(
|
|
prep_stmt_type_t type,
|
|
void* id)
|
|
{
|
|
prep_stmt_t* pstmt;
|
|
|
|
pstmt = (prep_stmt_t *)calloc(1, sizeof(prep_stmt_t));
|
|
|
|
if (pstmt != NULL)
|
|
{
|
|
#if defined(SS_DEBUG)
|
|
pstmt->pstmt_chk_top = CHK_NUM_PREP_STMT;
|
|
pstmt->pstmt_chk_tail = CHK_NUM_PREP_STMT;
|
|
#endif
|
|
pstmt->pstmt_state = PREP_STMT_ALLOC;
|
|
pstmt->pstmt_type = type;
|
|
|
|
if (type == PREP_STMT_NAME)
|
|
{
|
|
pstmt->pstmt_id.name = strndup((char *)id, MAX_STMT_LEN);
|
|
}
|
|
else
|
|
{
|
|
pstmt->pstmt_id.seq = 0;
|
|
}
|
|
}
|
|
CHK_PREP_STMT(pstmt);
|
|
return pstmt;
|
|
}
|
|
|
|
static void prep_stmt_done(
|
|
prep_stmt_t* pstmt)
|
|
{
|
|
CHK_PREP_STMT(pstmt);
|
|
|
|
if (pstmt->pstmt_type == PREP_STMT_NAME)
|
|
{
|
|
free(pstmt->pstmt_id.name);
|
|
}
|
|
free(pstmt);
|
|
}
|
|
|
|
static bool prep_stmt_drop(
|
|
prep_stmt_t* pstmt)
|
|
{
|
|
CHK_PREP_STMT(pstmt);
|
|
|
|
pstmt->pstmt_state = PREP_STMT_DROPPED;
|
|
return true;
|
|
}
|
|
#endif /*< PREP_STMT_CACHING */
|
|
|
|
/********************************
|
|
* This routine returns the root master server from MySQL replication tree
|
|
* Get the root Master rule:
|
|
*
|
|
* find server with the lowest replication depth level
|
|
* and the SERVER_MASTER bitval
|
|
* Servers are checked even if they are in 'maintenance'
|
|
*
|
|
* @param servers The list of servers
|
|
* @param router_nservers The number of servers
|
|
* @return The Master found
|
|
*
|
|
*/
|
|
static BACKEND *get_root_master(backend_ref_t *servers, int router_nservers) {
|
|
int i = 0;
|
|
BACKEND * master_host = NULL;
|
|
|
|
for (i = 0; i< router_nservers; i++) {
|
|
BACKEND* b = NULL;
|
|
b = servers[i].bref_backend;
|
|
if (b && (b->backend_server->status & (SERVER_MASTER|SERVER_MAINT)) == SERVER_MASTER) {
|
|
if (master_host && b->backend_server->depth < master_host->backend_server->depth) {
|
|
master_host = b;
|
|
} else {
|
|
if (master_host == NULL) {
|
|
master_host = b;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return master_host;
|
|
}
|