Files
MaxScale/server/modules/routing/readwritesplit/readwritesplit.c
Mark Riddoch 1245fba35b Addition of the module info structure to allow module information to
be extracted from the modules.

This gives a way to verify the API that the module provides as well
as the version of that API. The hope is that this will make it possible
for MaxScale to detect out of date plugins and either adapt to use them
or reject loading them.

Also added the ability to set a release state on a per module basis.
This allows for production ready and non-production ready plugins to
be identified.
2014-06-02 17:10:05 +01:00

2463 lines
87 KiB
C

/*
* This file is distributed as part of the SkySQL Gateway. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2013
*/
#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <router.h>
#include <readwritesplit.h>
#include <mysql.h>
#include <skygw_utils.h>
#include <log_manager.h>
#include <query_classifier.h>
#include <dcb.h>
#include <spinlock.h>
#include <modinfo.h>
MODULE_INFO info = {
MODULE_API_ROUTER,
MODULE_ALPHA_RELEASE,
ROUTER_VERSION,
"A Read/Write splitting router for enhancement read scalability"
};
extern int lm_enabled_logfiles_bitmask;
/**
* @file readwritesplit.c The entry points for the read/write query splitting
* router module.
*
* This file contains the entry points that comprise the API to the read write
* query splitting router.
* @verbatim
* Revision History
*
* Date Who Description
* 01/07/2013 Vilho Raatikka Initial implementation
* 15/07/2013 Massimiliano Pinto Added clientReply
* from master only in case of session change
* 17/07/2013 Massimiliano Pinto clientReply is now used by mysql_backend
* for all reply situations
* 18/07/2013 Massimiliano Pinto routeQuery now handles COM_QUIT
* as QUERY_TYPE_SESSION_WRITE
* 17/07/2014 Massimiliano Pinto Server connection counter is updated in closeSession
*
* @endverbatim
*/
static char *version_str = "V1.0.2";
static ROUTER* createInstance(SERVICE *service, char **options);
static void* newSession(ROUTER *instance, SESSION *session);
static void closeSession(ROUTER *instance, void *session);
static void freeSession(ROUTER *instance, void *session);
static int routeQuery(ROUTER *instance, void *session, GWBUF *queue);
static void diagnostic(ROUTER *instance, DCB *dcb);
static void clientReply(
ROUTER* instance,
void* router_session,
GWBUF* queue,
DCB* backend_dcb);
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
int bref_cmp_global_conn(
const void* bref1,
const void* bref2);
int bref_cmp_router_conn(
const void* bref1,
const void* bref2);
int bref_cmp_behind_master(
const void* bref1,
const void* bref2);
int (*criteria_cmpfun[LAST_CRITERIA])(const void*, const void*)=
{
NULL,
bref_cmp_global_conn,
bref_cmp_router_conn,
bref_cmp_behind_master
};
static bool select_connect_backend_servers(
backend_ref_t** p_master_ref,
backend_ref_t* backend_ref,
int router_nservers,
int max_nslaves,
select_criteria_t select_criteria,
SESSION* session,
ROUTER_INSTANCE* router);
static bool get_dcb(
DCB** dcb,
ROUTER_CLIENT_SES* rses,
backend_type_t btype);
static void rwsplit_process_options(
ROUTER_INSTANCE* router,
char** options);
static ROUTER_OBJECT MyObject = {
createInstance,
newSession,
closeSession,
freeSession,
routeQuery,
diagnostic,
clientReply,
NULL,
getCapabilities
};
static bool rses_begin_locked_router_action(
ROUTER_CLIENT_SES* rses);
static void rses_end_locked_router_action(
ROUTER_CLIENT_SES* rses);
static void mysql_sescmd_done(
mysql_sescmd_t* sescmd);
static mysql_sescmd_t* mysql_sescmd_init (
rses_property_t* rses_prop,
GWBUF* sescmd_buf,
unsigned char packet_type,
ROUTER_CLIENT_SES* rses);
static rses_property_t* mysql_sescmd_get_property(
mysql_sescmd_t* scmd);
static rses_property_t* rses_property_init(
rses_property_type_t prop_type);
static void rses_property_add(
ROUTER_CLIENT_SES* rses,
rses_property_t* prop);
static void rses_property_done(
rses_property_t* prop);
static mysql_sescmd_t* rses_property_get_sescmd(
rses_property_t* prop);
static bool execute_sescmd_in_backend(
backend_ref_t* backend_ref);
static void sescmd_cursor_set_active(
sescmd_cursor_t* sescmd_cursor,
bool value);
static bool sescmd_cursor_is_active(
sescmd_cursor_t* sescmd_cursor);
static GWBUF* sescmd_cursor_clone_querybuf(
sescmd_cursor_t* scur);
static mysql_sescmd_t* sescmd_cursor_get_command(
sescmd_cursor_t* scur);
static bool sescmd_cursor_next(
sescmd_cursor_t* scur);
static GWBUF* sescmd_cursor_process_replies(
DCB* client_dcb,
GWBUF* replybuf,
sescmd_cursor_t* scur);
static void tracelog_routed_query(
ROUTER_CLIENT_SES* rses,
char* funcname,
backend_ref_t* bref,
GWBUF* buf);
static bool route_session_write(
ROUTER_CLIENT_SES* router_client_ses,
GWBUF* querybuf,
ROUTER_INSTANCE* inst,
unsigned char packet_type,
skygw_query_type_t qtype);
static void refreshInstance(
ROUTER_INSTANCE* router,
CONFIG_PARAMETER* param);
static SPINLOCK instlock;
static ROUTER_INSTANCE* instances;
/**
* Implementation of the mandatory version entry point
*
* @return version string of the module
*/
char *
version()
{
return version_str;
}
/**
* The module initialisation routine, called when the module
* is first loaded.
*/
void
ModuleInit()
{
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"Initializing statemend-based read/write split router module.")));
spinlock_init(&instlock);
instances = NULL;
}
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
* "module object", this is a structure with the set of
* external entry points for this module.
*
* @return The module object
*/
ROUTER_OBJECT* GetModuleObject()
{
return &MyObject;
}
static void refreshInstance(
ROUTER_INSTANCE* router,
CONFIG_PARAMETER* param)
{
config_param_type_t paramtype;
paramtype = config_get_paramtype(param);
if (paramtype == COUNT_TYPE)
{
if (strncmp(param->name, "max_slave_connections", MAX_PARAM_LEN) == 0)
{
router->rwsplit_config.rw_max_slave_conn_percent = 0;
router->rwsplit_config.rw_max_slave_conn_count =
config_get_valint(param, NULL, paramtype);
}
}
else if (paramtype == PERCENT_TYPE)
{
if (strncmp(param->name, "max_slave_connections", MAX_PARAM_LEN) == 0)
{
router->rwsplit_config.rw_max_slave_conn_count = 0;
router->rwsplit_config.rw_max_slave_conn_percent =
config_get_valint(param, NULL, paramtype);
}
}
}
/**
* Create an instance of read/write statemtn router within the MaxScale.
*
*
* @param service The service this router is being create for
* @param options The options for this query router
*
* @return NULL in failure, pointer to router in success.
*/
static ROUTER* createInstance(
SERVICE* service,
char** options)
{
ROUTER_INSTANCE* router;
SERVER* server;
int nservers;
int i;
CONFIG_PARAMETER* param;
if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
return NULL;
}
router->service = service;
spinlock_init(&router->lock);
/** Calculate number of servers */
server = service->databases;
nservers = 0;
while (server != NULL)
{
nservers++;
server=server->nextdb;
}
router->servers = (BACKEND **)calloc(nservers + 1, sizeof(BACKEND *));
if (router->servers == NULL)
{
free(router);
return NULL;
}
/**
* Create an array of the backend servers in the router structure to
* maintain a count of the number of connections to each
* backend server.
*/
server = service->databases;
nservers= 0;
while (server != NULL) {
if ((router->servers[nservers] = malloc(sizeof(BACKEND))) == NULL)
{
/** clean up */
for (i = 0; i < nservers; i++) {
free(router->servers[i]);
}
free(router->servers);
free(router);
return NULL;
}
router->servers[nservers]->backend_server = server;
router->servers[nservers]->backend_conn_count = 0;
router->servers[nservers]->be_valid = false;
#if defined(SS_DEBUG)
router->servers[nservers]->be_chk_top = CHK_NUM_BACKEND;
router->servers[nservers]->be_chk_tail = CHK_NUM_BACKEND;
#endif
nservers += 1;
server = server->nextdb;
}
router->servers[nservers] = NULL;
/**
* vraa : is this necessary for readwritesplit ?
* Option : where can a read go?
* - master (only)
* - slave (only)
* - joined (to both)
*
* Process the options
*/
router->bitmask = 0;
router->bitvalue = 0;
if (options)
{
rwsplit_process_options(router, options);
}
/**
* Set default value for max_slave_connections and for slave selection
* criteria. If parameter is set in config file max_slave_connections
* will be overwritten.
*/
router->rwsplit_config.rw_max_slave_conn_count = CONFIG_MAX_SLAVE_CONN;
if (router->rwsplit_config.rw_slave_select_criteria == UNDEFINED_CRITERIA)
{
router->rwsplit_config.rw_slave_select_criteria = DEFAULT_CRITERIA;
}
/**
* Copy all config parameters from service to router instance.
* Finally, copy version number to indicate that configs match.
*/
param = config_get_param(service->svc_config_param, "max_slave_connections");
if (param != NULL)
{
refreshInstance(router, param);
router->rwsplit_version = service->svc_config_version;
}
/**
* We have completed the creation of the router data, so now
* insert this router into the linked list of routers
* that have been created with this module.
*/
spinlock_acquire(&instlock);
router->next = instances;
instances = router;
spinlock_release(&instlock);
return (ROUTER *)router;
}
/**
* Associate a new session with this instance of the router.
*
* The session is used to store all the data required for a particular
* client connection.
*
* @param instance The router instance data
* @param session The session itself
* @return Session specific data for this session
*/
static void* newSession(
ROUTER* router_inst,
SESSION* session)
{
backend_ref_t* backend_ref; /*< array of backend references (DCB,BACKEND,cursor) */
backend_ref_t* master_ref = NULL; /*< pointer to selected master */
BACKEND** b;
ROUTER_CLIENT_SES* client_rses = NULL;
ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst;
bool succp;
int router_nservers = 0; /*< # of servers in total */
int max_nslaves; /*< max # of slaves used in this session */
int conf_max_nslaves; /*< value from configuration file */
int i;
const int min_nservers = 1; /*< hard-coded for now */
static uint64_t router_client_ses_seq; /*< ID for client session */
client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES));
if (client_rses == NULL)
{
ss_dassert(false);
goto return_rses;
}
#if defined(SS_DEBUG)
client_rses->rses_chk_top = CHK_NUM_ROUTER_SES;
client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES;
#endif
/**
* If service config has been changed, reload config from service to
* router instance first.
*/
spinlock_acquire(&router->lock);
if (router->service->svc_config_version > router->rwsplit_version)
{
CONFIG_PARAMETER* param = router->service->svc_config_param;
while (param != NULL)
{
refreshInstance(router, param);
param = param->next;
}
router->rwsplit_version = router->service->svc_config_version;
/** Read options */
rwsplit_process_options(router, router->service->routerOptions);
}
/** Copy config struct from router instance */
client_rses->rses_config = router->rwsplit_config;
/** Create ID for the new client (router_client_ses) session */
client_rses->rses_id = router_client_ses_seq += 1;
spinlock_release(&router->lock);
/**
* Set defaults to session variables.
*/
client_rses->rses_autocommit_enabled = true;
client_rses->rses_transaction_active = false;
/** count servers */
b = router->servers;
while (*(b++) != NULL) router_nservers++;
/** With too few servers session is not created */
if (router_nservers < min_nservers ||
MAX(client_rses->rses_config.rw_max_slave_conn_count,
(router_nservers*client_rses->rses_config.rw_max_slave_conn_percent)/100)
< min_nservers)
{
if (router_nservers < min_nservers)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to start %s service. There are "
"too few backend servers available. Found %d "
"when %d is required.",
router->service->name,
router_nservers,
min_nservers)));
}
else
{
double pct = client_rses->rses_config.rw_max_slave_conn_percent/100;
double nservers = (double)router_nservers*pct;
if (client_rses->rses_config.rw_max_slave_conn_count <
min_nservers)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to start %s service. There are "
"too few backend servers configured in "
"MaxScale.cnf. Found %d when %d is required.",
router->service->name,
client_rses->rses_config.rw_max_slave_conn_count,
min_nservers)));
}
if (nservers < min_nservers)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to start %s service. There are "
"too few backend servers configured in "
"MaxScale.cnf. Found %d%% when at least %.0f%% "
"would be required.",
router->service->name,
client_rses->rses_config.rw_max_slave_conn_percent,
min_nservers/(((double)router_nservers)/100))));
}
}
free(client_rses);
client_rses = NULL;
goto return_rses;
}
/**
* Create backend reference objects for this session.
*/
backend_ref = (backend_ref_t *)calloc (1, router_nservers*sizeof(backend_ref_t));
if (backend_ref == NULL)
{
/** log this */
free(client_rses);
free(backend_ref);
client_rses = NULL;
goto return_rses;
}
/**
* Initialize backend references with BACKEND ptr.
* Initialize session command cursors for each backend reference.
*/
for (i=0; i< router_nservers; i++)
{
#if defined(SS_DEBUG)
backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF;
backend_ref[i].bref_chk_tail = CHK_NUM_BACKEND_REF;
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR;
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR;
#endif
backend_ref[i].bref_backend = router->servers[i];
/** store pointers to sescmd list to both cursors */
backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses;
backend_ref[i].bref_sescmd_cur.scmd_cur_active = false;
backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property =
&client_rses->rses_properties[RSES_PROP_TYPE_SESCMD];
backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL;
}
/**
* Find out the number of read backend servers.
* Depending on the configuration value type, either copy direct count
* of slave connections or calculate the count from percentage value.
*/
if (client_rses->rses_config.rw_max_slave_conn_count > 0)
{
conf_max_nslaves = client_rses->rses_config.rw_max_slave_conn_count;
}
else
{
conf_max_nslaves =
(router_nservers*client_rses->rses_config.rw_max_slave_conn_percent)/100;
}
max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves));
spinlock_init(&client_rses->rses_lock);
client_rses->rses_backend_ref = backend_ref;
/**
* Find a backend servers to connect to.
*/
succp = select_connect_backend_servers(&master_ref,
backend_ref,
router_nservers,
max_nslaves,
client_rses->rses_config.rw_slave_select_criteria,
session,
router);
/** Both Master and at least 1 slave must be found */
if (!succp) {
free(client_rses->rses_backend_ref);
free(client_rses);
client_rses = NULL;
goto return_rses;
}
/** Copy backend pointers to router session. */
client_rses->rses_master_ref = master_ref;
client_rses->rses_backend_ref = backend_ref;
client_rses->rses_nbackends = router_nservers; /*< # of backend servers */
client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT;
router->stats.n_sessions += 1;
/**
* Version is bigger than zero once initialized.
*/
atomic_add(&client_rses->rses_versno, 2);
ss_dassert(client_rses->rses_versno == 2);
/**
* Add this session to end of the list of active sessions in router.
*/
spinlock_acquire(&router->lock);
client_rses->next = router->connections;
router->connections = client_rses;
spinlock_release(&router->lock);
return_rses:
#if defined(SS_DEBUG)
if (client_rses != NULL)
{
CHK_CLIENT_RSES(client_rses);
}
#endif
return (void *)client_rses;
}
/**
* Close a session with the router, this is the mechanism
* by which a router may cleanup data structure etc.
*
* @param instance The router instance data
* @param session The session being closed
*/
static void closeSession(
ROUTER* instance,
void* router_session)
{
ROUTER_CLIENT_SES* router_cli_ses;
backend_ref_t* backend_ref;
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
CHK_CLIENT_RSES(router_cli_ses);
backend_ref = router_cli_ses->rses_backend_ref;
/**
* Lock router client session for secure read and update.
*/
if (!router_cli_ses->rses_closed &&
rses_begin_locked_router_action(router_cli_ses))
{
int i = 0;
/**
* session must be moved to SESSION_STATE_STOPPING state before
* router session is closed.
*/
#if defined(SS_DEBUG)
SESSION* ses = get_session_by_router_ses((void*)router_cli_ses);
ss_dassert(ses != NULL);
ss_dassert(ses->state == SESSION_STATE_STOPPING);
#endif
/**
* This sets router closed. Nobody is allowed to use router
* whithout checking this first.
*/
router_cli_ses->rses_closed = true;
for (i=0; i<router_cli_ses->rses_nbackends; i++)
{
DCB* dcb = backend_ref[i].bref_dcb;
/** Close those which had been connected */
if (dcb != NULL)
{
CHK_DCB(dcb);
backend_ref[i].bref_dcb = NULL; /*< prevent new uses of DCB */
dcb->func.close(dcb);
/** decrease server current connection counters */
atomic_add(&backend_ref[i].bref_backend->backend_server->stats.n_current, -1);
atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1);
}
}
/** Unlock */
rses_end_locked_router_action(router_cli_ses);
}
}
static void freeSession(
ROUTER* router_instance,
void* router_client_session)
{
ROUTER_CLIENT_SES* router_cli_ses;
ROUTER_INSTANCE* router;
int i;
backend_ref_t* backend_ref;
router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session;
router = (ROUTER_INSTANCE *)router_instance;
backend_ref = router_cli_ses->rses_backend_ref;
for (i=0; i<router_cli_ses->rses_nbackends; i++)
{
if (backend_ref[i].bref_dcb == NULL)
{
continue;
}
ss_dassert(backend_ref[i].bref_backend->backend_conn_count > 0);
atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1);
}
spinlock_acquire(&router->lock);
if (router->connections == router_cli_ses) {
router->connections = router_cli_ses->next;
} else {
ROUTER_CLIENT_SES* ptr = router->connections;
while (ptr && ptr->next != router_cli_ses) {
ptr = ptr->next;
}
if (ptr) {
ptr->next = router_cli_ses->next;
}
}
spinlock_release(&router->lock);
/**
* For each property type, walk through the list, finalize properties
* and free the allocated memory.
*/
for (i=RSES_PROP_TYPE_FIRST; i<RSES_PROP_TYPE_COUNT; i++)
{
rses_property_t* p = router_cli_ses->rses_properties[i];
rses_property_t* q = p;
while (p != NULL)
{
q = p->rses_prop_next;
rses_property_done(p);
p = q;
}
}
/*
* We are no longer in the linked list, free
* all the memory and other resources associated
* to the client session.
*/
free(router_cli_ses->rses_backend_ref);
free(router_cli_ses);
return;
}
static bool get_dcb(
DCB** p_dcb,
ROUTER_CLIENT_SES* rses,
backend_type_t btype)
{
backend_ref_t* backend_ref;
int smallest_nconn = -1;
int i;
bool succp = false;
CHK_CLIENT_RSES(rses);
ss_dassert(p_dcb != NULL && *(p_dcb) == NULL);
if (p_dcb == NULL)
{
goto return_succp;
}
backend_ref = rses->rses_backend_ref;
if (btype == BE_SLAVE)
{
for (i=0; i<rses->rses_nbackends; i++)
{
BACKEND* b = backend_ref[i].bref_backend;
if (backend_ref[i].bref_dcb != NULL &&
SERVER_IS_SLAVE(b->backend_server) &&
(smallest_nconn == -1 ||
b->backend_conn_count < smallest_nconn))
{
*p_dcb = backend_ref[i].bref_dcb;
smallest_nconn = b->backend_conn_count;
succp = true;
}
}
if (!succp)
{
backend_ref = rses->rses_master_ref;
if (backend_ref->bref_dcb != NULL)
{
*p_dcb = backend_ref->bref_dcb;
succp = true;
ss_dassert(
SERVER_IS_MASTER(backend_ref->bref_backend->backend_server) &&
smallest_nconn == -1);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Warning : No slaves connected nor "
"available. Choosing master %s:%d "
"instead.",
backend_ref->bref_backend->backend_server->name,
backend_ref->bref_backend->backend_server->port)));
}
}
ss_dassert(succp);
}
else if (btype == BE_MASTER || BE_JOINED)
{
for (i=0; i<rses->rses_nbackends; i++)
{
BACKEND* b = backend_ref[i].bref_backend;
if (backend_ref[i].bref_dcb != NULL &&
(SERVER_IS_MASTER(b->backend_server) ||
SERVER_IS_JOINED(b->backend_server)))
{
*p_dcb = backend_ref[i].bref_dcb;
succp = true;
goto return_succp;
}
}
}
return_succp:
return succp;
}
/**
* The main routing entry, this is called with every packet that is
* received and has to be forwarded to the backend database.
*
* The routeQuery will make the routing decision based on the contents
* of the instance, session and the query itself in the queue. The
* data in the queue may not represent a complete query, it represents
* the data that has been received. The query router itself is responsible
* for buffering the partial query, a later call to the query router will
* contain the remainder, or part thereof of the query.
*
* @param instance The query router instance
* @param session The session associated with the client
* @param queue Gateway buffer queue with the packets received
*
* @return The number of queries forwarded
*/
static int routeQuery(
ROUTER* instance,
void* router_session,
GWBUF* querybuf)
{
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
GWBUF* plainsqlbuf = NULL;
char* querystr = NULL;
char* startpos;
unsigned char packet_type;
uint8_t* packet;
int ret = 0;
DCB* master_dcb = NULL;
DCB* slave_dcb = NULL;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
bool rses_is_closed = false;
size_t len;
CHK_CLIENT_RSES(router_cli_ses);
/** Dirty read for quick check if router is closed. */
if (router_cli_ses->rses_closed)
{
rses_is_closed = true;
}
packet = GWBUF_DATA(querybuf);
packet_type = packet[4];
if (rses_is_closed)
{
LOGIF(LE,
(skygw_log_write_flush(
LOGFILE_ERROR,
"Error: Failed to route %s:%s:\"%s\" to "
"backend server. %s.",
STRPACKETTYPE(packet_type),
STRQTYPE(qtype),
(querystr == NULL ? "(empty)" : querystr),
(rses_is_closed ? "Router was closed" :
"Router has no backend servers where to "
"route to"))));
goto return_ret;
}
inst->stats.n_queries++;
startpos = (char *)&packet[5];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
CHK_DCB(master_dcb);
switch(packet_type) {
case COM_QUIT: /**< 1 QUIT will close all sessions */
case COM_INIT_DB: /**< 2 DDL must go to the master */
case COM_REFRESH: /**< 7 - I guess this is session but not sure */
case COM_DEBUG: /**< 0d all servers dump debug info to stdout */
case COM_PING: /**< 0e all servers are pinged */
case COM_CHANGE_USER: /**< 11 all servers change it accordingly */
qtype = QUERY_TYPE_SESSION_WRITE;
break;
case COM_CREATE_DB: /**< 5 DDL must go to the master */
case COM_DROP_DB: /**< 6 DDL must go to the master */
qtype = QUERY_TYPE_WRITE;
break;
case COM_QUERY:
plainsqlbuf = gwbuf_clone_transform(querybuf,
GWBUF_TYPE_PLAINSQL);
len = GWBUF_LENGTH(plainsqlbuf);
/** unnecessary if buffer includes additional terminating null */
querystr = (char *)malloc(len+1);
memcpy(querystr, startpos, len);
memset(&querystr[len], 0, 1);
// querystr = (char *)GWBUF_DATA(plainsqlbuf);
/*
* querystr = master_dcb->func.getquerystr(
* (void *) gwbuf_clone(querybuf),
* &querystr_is_copy);
*/
qtype = skygw_query_classifier_get_type(querystr, 0);
break;
case COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
case COM_STATISTICS: /**< 9 ? */
case COM_PROCESS_INFO: /**< 0a ? */
case COM_CONNECT: /**< 0b ? */
case COM_PROCESS_KILL: /**< 0c ? */
case COM_TIME: /**< 0f should this be run in gateway ? */
case COM_DELAYED_INSERT: /**< 10 ? */
case COM_DAEMON: /**< 1d ? */
default:
break;
} /**< switch by packet type */
#if 0
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"String\t\"%s\"",
querystr == NULL ? "(empty)" : querystr)));
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"Packet type\t%s",
STRPACKETTYPE(packet_type))));
#endif
#if defined(AUTOCOMMIT_OPT)
if ((QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT) &&
!router_cli_ses->rses_autocommit_enabled) ||
(QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) &&
router_cli_ses->rses_autocommit_enabled))
{
/** reply directly to client */
}
#endif
/**
* If autocommit is disabled or transaction is explicitly started
* transaction becomes active and master gets all statements until
* transaction is committed and autocommit is enabled again.
*/
if (router_cli_ses->rses_autocommit_enabled &&
QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT))
{
router_cli_ses->rses_autocommit_enabled = false;
if (!router_cli_ses->rses_transaction_active)
{
router_cli_ses->rses_transaction_active = true;
}
}
else if (!router_cli_ses->rses_transaction_active &&
QUERY_IS_TYPE(qtype, QUERY_TYPE_BEGIN_TRX))
{
router_cli_ses->rses_transaction_active = true;
}
/**
* Explicit COMMIT and ROLLBACK, implicit COMMIT.
*/
if (router_cli_ses->rses_autocommit_enabled &&
router_cli_ses->rses_transaction_active &&
(QUERY_IS_TYPE(qtype,QUERY_TYPE_COMMIT) ||
QUERY_IS_TYPE(qtype,QUERY_TYPE_ROLLBACK)))
{
router_cli_ses->rses_transaction_active = false;
}
else if (!router_cli_ses->rses_autocommit_enabled &&
QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT))
{
router_cli_ses->rses_autocommit_enabled = true;
router_cli_ses->rses_transaction_active = false;
}
/**
* Session update is always routed in the same way.
*/
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE))
{
bool succp = route_session_write(
router_cli_ses,
querybuf,
inst,
packet_type,
qtype);
if (succp)
{
ret = 1;
}
ss_dassert(succp);
ss_dassert(ret == 1);
goto return_ret;
}
else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) &&
!router_cli_ses->rses_transaction_active)
{
bool succp;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"[%s.%d]\tRead-only query, routing to Slave.",
inst->service->name,
router_cli_ses->rses_id)));
ss_dassert(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ));
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
goto return_ret;
}
succp = get_dcb(&slave_dcb, router_cli_ses, BE_SLAVE);
if (succp)
{
if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1)
{
atomic_add(&inst->stats.n_slave, 1);
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing query \"%s\" failed.",
querystr)));
}
}
rses_end_locked_router_action(router_cli_ses);
ss_dassert(succp);
goto return_ret;
}
else
{
bool succp = true;
if (LOG_IS_ENABLED(LOGFILE_TRACE))
{
if (router_cli_ses->rses_transaction_active) /*< all to master */
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Transaction is active, routing to Master.")));
}
else
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Begin transaction, write or unspecified type, "
"routing to Master.")));
}
}
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
goto return_ret;
}
if (master_dcb == NULL)
{
succp = get_dcb(&master_dcb, router_cli_ses, BE_MASTER);
}
if (succp)
{
if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1)
{
atomic_add(&inst->stats.n_master, 1);
}
}
rses_end_locked_router_action(router_cli_ses);
ss_dassert(succp);
if (ret == 0)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing to master failed.")));
}
}
return_ret:
if (plainsqlbuf != NULL)
{
gwbuf_free(plainsqlbuf);
}
if (querystr != NULL)
{
free(querystr);
}
return ret;
}
/** to be inline'd */
/**
* @node Acquires lock to router client session if it is not closed.
*
* Parameters:
* @param rses - in, use
*
*
* @return true if router session was not closed. If return value is true
* it means that router is locked, and must be unlocked later. False, if
* router was closed before lock was acquired.
*
*
* @details (write detailed description here)
*
*/
static bool rses_begin_locked_router_action(
ROUTER_CLIENT_SES* rses)
{
bool succp = false;
CHK_CLIENT_RSES(rses);
if (rses->rses_closed) {
goto return_succp;
}
spinlock_acquire(&rses->rses_lock);
if (rses->rses_closed) {
spinlock_release(&rses->rses_lock);
goto return_succp;
}
succp = true;
return_succp:
if (!succp)
{
/** log that router session was closed */
}
return succp;
}
/** to be inline'd */
/**
* @node Releases router client session lock.
*
* Parameters:
* @param rses - <usage>
* <description>
*
* @return void
*
*
* @details (write detailed description here)
*
*/
static void rses_end_locked_router_action(
ROUTER_CLIENT_SES* rses)
{
CHK_CLIENT_RSES(rses);
spinlock_release(&rses->rses_lock);
}
/**
* Diagnostics routine
*
* Print query router statistics to the DCB passed in
*
* @param instance The router instance
* @param dcb The DCB for diagnostic output
*/
static void
diagnostic(ROUTER *instance, DCB *dcb)
{
ROUTER_CLIENT_SES *router_cli_ses;
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
int i = 0;
spinlock_acquire(&router->lock);
router_cli_ses = router->connections;
while (router_cli_ses)
{
i++;
router_cli_ses = router_cli_ses->next;
}
spinlock_release(&router->lock);
dcb_printf(dcb,
"\tNumber of router sessions: %d\n",
router->stats.n_sessions);
dcb_printf(dcb,
"\tCurrent no. of router sessions: %d\n",
i);
dcb_printf(dcb,
"\tNumber of queries forwarded: %d\n",
router->stats.n_queries);
dcb_printf(dcb,
"\tNumber of queries forwarded to master: %d\n",
router->stats.n_master);
dcb_printf(dcb,
"\tNumber of queries forwarded to slave: %d\n",
router->stats.n_slave);
dcb_printf(dcb,
"\tNumber of queries forwarded to all: %d\n",
router->stats.n_all);
}
/**
* Client Reply routine
*
* The routine will reply to client for session change with master server data
*
* @param instance The router instance
* @param router_session The router session
* @param backend_dcb The backend DCB
* @param queue The GWBUF with reply data
*/
static void clientReply(
ROUTER* instance,
void* router_session,
GWBUF* writebuf,
DCB* backend_dcb)
{
DCB* client_dcb;
ROUTER_CLIENT_SES* router_cli_ses;
sescmd_cursor_t* scur = NULL;
backend_ref_t* backend_ref;
int i;
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
CHK_CLIENT_RSES(router_cli_ses);
/**
* Lock router client session for secure read of router session members.
* Note that this could be done without lock by using version #
*/
if (!rses_begin_locked_router_action(router_cli_ses))
{
while ((writebuf = gwbuf_consume(
writebuf,
GWBUF_LENGTH(writebuf))) != NULL);
goto lock_failed;
}
/** Holding lock ensures that router session remains open */
ss_dassert(backend_dcb->session != NULL);
client_dcb = backend_dcb->session->client;
/** Unlock */
rses_end_locked_router_action(router_cli_ses);
/**
* 1. Check if backend received reply to sescmd.
* 2. Check sescmd's state whether OK_PACKET has been
* sent to client already and if not, lock property cursor,
* reply to client, and move property cursor forward. Finally
* release the lock.
* 3. If reply for this sescmd is sent, lock property cursor
* and
*/
if (client_dcb == NULL)
{
while ((writebuf = gwbuf_consume(
writebuf,
GWBUF_LENGTH(writebuf))) != NULL);
/** Log that client was closed before reply */
goto lock_failed;
}
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
/** Log to debug that router was closed */
goto lock_failed;
}
backend_ref = router_cli_ses->rses_backend_ref;
/** find backend_dcb's corresponding BACKEND */
i = 0;
while (i<router_cli_ses->rses_nbackends &&
backend_ref[i].bref_dcb != backend_dcb)
{
i++;
}
ss_dassert(backend_ref[i].bref_dcb == backend_dcb);
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"reply_by_statement",
&backend_ref[i],
gwbuf_clone(writebuf)));
scur = &backend_ref[i].bref_sescmd_cur;
/**
* Active cursor means that reply is from session command
* execution. Majority of the time there are no session commands
* being executed.
*/
if (sescmd_cursor_is_active(scur))
{
writebuf = sescmd_cursor_process_replies(client_dcb,
writebuf,
scur);
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
if (writebuf != NULL && client_dcb != NULL)
{
/** Write reply to client DCB */
client_dcb->func.write(client_dcb, writebuf);
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [clientReply:rwsplit] client dcb %p, "
"backend dcb %p. End of normal reply.",
pthread_self(),
client_dcb,
backend_dcb)));
}
lock_failed:
return;
}
int bref_cmp_router_conn(
const void* bref1,
const void* bref2)
{
BACKEND* b1 = ((backend_ref_t *)bref1)->bref_backend;
BACKEND* b2 = ((backend_ref_t *)bref2)->bref_backend;
return ((b1->backend_conn_count < b2->backend_conn_count) ? -1 :
((b1->backend_conn_count > b2->backend_conn_count) ? 1 : 0));
}
int bref_cmp_global_conn(
const void* bref1,
const void* bref2)
{
BACKEND* b1 = ((backend_ref_t *)bref1)->bref_backend;
BACKEND* b2 = ((backend_ref_t *)bref2)->bref_backend;
return ((b1->backend_server->stats.n_current < b2->backend_server->stats.n_current) ? -1 :
((b1->backend_server->stats.n_current > b2->backend_server->stats.n_current) ? 1 : 0));
}
int bref_cmp_behind_master(
const void* bref1,
const void* bref2)
{
return 1;
}
/**
* @node Search suitable backend servers from those of router instance.
*
* Parameters:
* @param p_master_ref - in, use, out
* Pointer to location where master's backend reference is to be stored.
* NULL is not allowed.
*
* @param backend_ref - in, use, out
* Pointer to backend server reference object array.
* NULL is not allowed.
*
* @param router_nservers - in, use
* Number of backend server pointers pointed to by b.
*
* @param max_nslaves - in, use
* Upper limit for the number of slaves. Configuration parameter or default.
*
* @param session - in, use
* MaxScale session pointer used when connection to backend is established.
*
* @param router - in, use
* Pointer to router instance. Used when server states are qualified.
*
* @return true, if at least one master and one slave was found.
*
*
* @details It is assumed that there is only one master among servers of
* a router instance. As a result, the first master found is chosen.
*/
static bool select_connect_backend_servers(
backend_ref_t** p_master_ref,
backend_ref_t* backend_ref,
int router_nservers,
int max_nslaves,
select_criteria_t select_criteria,
SESSION* session,
ROUTER_INSTANCE* router)
{
bool succp = true;
bool master_found = false;
bool master_connected = false;
int slaves_found = 0;
int slaves_connected = 0;
int i;
const int min_nslaves = 0; /*< not configurable at the time */
bool is_synced_master;
int (*p)(const void *, const void *);
if (p_master_ref == NULL || backend_ref == NULL)
{
ss_dassert(FALSE);
succp = false;
goto return_succp;
}
/** Check slave selection criteria and set compare function */
p = criteria_cmpfun[select_criteria];
if (p == NULL)
{
succp = false;
goto return_succp;
}
if (router->bitvalue != 0) /*< 'synced' is the only bitvalue in rwsplit */
{
is_synced_master = true;
}
else
{
is_synced_master = false;
}
#if 0
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns before ordering:")));
for (i=0; i<router_nservers; i++)
{
BACKEND* b = backend_ref[i].bref_backend;
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"%s %d:%d",
b->backend_server->name,
b->backend_server->port,
b->backend_conn_count)));
}
#endif
/**
* Sort the pointer list to servers according to connection counts. As
* a consequence those backends having least connections are in the
* beginning of the list.
*/
qsort((void *)backend_ref, (size_t)router_nservers, sizeof(backend_ref_t), p);
if (LOG_IS_ENABLED(LOGFILE_TRACE))
{
if (select_criteria == LEAST_GLOBAL_CONNECTIONS ||
select_criteria == LEAST_ROUTER_CONNECTIONS)
{
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"Servers and %s connection counts:",
select_criteria == LEAST_GLOBAL_CONNECTIONS ?
"all MaxScale" : "router")));
for (i=0; i<router_nservers; i++)
{
BACKEND* b = backend_ref[i].bref_backend;
switch(select_criteria) {
case LEAST_GLOBAL_CONNECTIONS:
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"%s %d:%d",
b->backend_server->name,
b->backend_server->port,
b->backend_server->stats.n_current)));
break;
case LEAST_ROUTER_CONNECTIONS:
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"%s %d:%d",
b->backend_server->name,
b->backend_server->port,
b->backend_conn_count)));
break;
default:
break;
}
}
}
}
/**
* Choose at least 1+1 (master and slave) and at most 1+max_nslaves
* servers from the sorted list. First master found is selected.
*/
for (i=0;
i<router_nservers && (slaves_connected < max_nslaves || !master_connected);
i++)
{
BACKEND* b = backend_ref[i].bref_backend;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [select_backend_servers] Examine server "
"%s:%d with %d connections. Status is %d, "
"router->bitvalue is %d",
pthread_self(),
b->backend_server->name,
b->backend_server->port,
b->backend_conn_count,
b->backend_server->status,
router->bitmask)));
if (SERVER_IS_RUNNING(b->backend_server) &&
((b->backend_server->status & router->bitmask) ==
router->bitvalue))
{
if (slaves_found < max_nslaves &&
SERVER_IS_SLAVE(b->backend_server))
{
slaves_found += 1;
backend_ref[i].bref_dcb = dcb_connect(
b->backend_server,
session,
b->backend_server->protocol);
if (backend_ref[i].bref_dcb != NULL)
{
slaves_connected += 1;
/**
* Increase backend connection counter.
* Server's stats are _increased_ in
* dcb.c:dcb_alloc !
* But decreased in the calling function
* of dcb_close.
*/
atomic_add(&b->backend_conn_count, 1);
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to establish "
"connection with slave %s:%d",
b->backend_server->name,
b->backend_server->port)));
/* handle connect error */
}
}
else if (!master_connected &&
(SERVER_IS_MASTER(b->backend_server) ||
SERVER_IS_JOINED(b->backend_server)))
{
master_found = true;
backend_ref[i].bref_dcb = dcb_connect(
b->backend_server,
session,
b->backend_server->protocol);
if (backend_ref[i].bref_dcb != NULL)
{
master_connected = true;
*p_master_ref = &backend_ref[i];
/** Increase backend connection counter */
/** Increase backend connection counter */
atomic_add(&b->backend_server->stats.n_current, 1);
atomic_add(&b->backend_server->stats.n_connections, 1);
atomic_add(&b->backend_conn_count, 1);
}
else
{
succp = false;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to establish "
"connection with master %s:%d",
b->backend_server->name,
b->backend_server->port)));
/* handle connect error */
}
}
}
} /*< for */
/**
* Successful cases
*/
if (master_connected &&
slaves_connected >= min_nslaves &&
slaves_connected <= max_nslaves)
{
succp = true;
if (slaves_connected == 0 && slaves_found > 0)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Warning : Couldn't connect to any of the %d "
"slaves. Routing to %s only.",
slaves_found,
(is_synced_master ? "Galera nodes" : "Master"))));
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Warning : Couldn't connect to any of the %d "
"slaves. Routing to %s only.",
slaves_found,
(is_synced_master ? "Galera nodes" : "Master"))));
}
else if (slaves_found == 0)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Warning : Couldn't find any slaves from existing "
"%d servers. Routing to %s only.",
router_nservers,
(is_synced_master ? "Galera nodes" : "Master"))));
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Warning : Couldn't find any slaves from existing "
"%d servers. Routing to %s only.",
router_nservers,
(is_synced_master ? "Galera nodes" : "Master"))));
}
else if (slaves_connected < max_nslaves)
{
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"Note : Couldn't connect to maximum number of "
"slaves. Connected successfully to %d slaves "
"of %d of them.",
slaves_connected,
slaves_found)));
}
if (LOG_IS_ENABLED(LT))
{
for (i=0; i<router_nservers; i++)
{
BACKEND* b = backend_ref[i].bref_backend;
if (backend_ref[i].bref_dcb != NULL)
{
backend_type_t btype = BACKEND_TYPE(b);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Selected %s in \t%s:%d",
(btype == BE_MASTER ? "master" :
(btype == BE_SLAVE ? "slave" :
(btype == BE_JOINED ? "galera node" :
"unknown node type"))),
b->backend_server->name,
b->backend_server->port)));
}
} /* for */
}
}
/**
* Failure cases
*/
else
{
if (!master_found)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Error : Couldn't find suitable %s from %d "
"candidates.",
(is_synced_master ? "Galera node" : "Master"),
router_nservers)));
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Error : Couldn't find suitable %s from %d "
"candidates.",
(is_synced_master ? "Galera node" : "Master"),
router_nservers)));
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Error : Couldn't find suitable %s from %d "
"candidates.",
(is_synced_master ? "Galera node" : "Master"),
router_nservers)));
}
else if (!master_connected)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Error : Couldn't connect to any %s although "
"there exists at least one %s node in the "
"cluster.",
(is_synced_master ? "Galera node" : "Master"),
(is_synced_master ? "Galera node" : "Master"))));
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Error : Couldn't connect to any %s although "
"there exists at least one %s node in the "
"cluster.",
(is_synced_master ? "Galera node" : "Master"),
(is_synced_master ? "Galera node" : "Master"))));
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Error : Couldn't connect to any %s although "
"there exists at least one %s node in the "
"cluster.",
(is_synced_master ? "Galera node" : "Master"),
(is_synced_master ? "Galera node" : "Master"))));
}
if (slaves_connected < min_nslaves)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Error : Couldn't establish required amount of "
"slave connections for router session.")));
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"*Error : Couldn't establish required amount of "
"slave connections for router session.")));
}
/** Clean up connections */
for (i=0; i<router_nservers; i++)
{
if (backend_ref[i].bref_dcb != NULL)
{
ss_dassert(backend_ref[i].bref_backend->backend_conn_count > 0);
/** disconnect opened connections */
backend_ref[i].bref_dcb->func.close(backend_ref[i].bref_dcb);
atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1);
}
}
master_connected = false;
slaves_connected = 0;
}
return_succp:
return succp;
}
/**
* Create a generic router session property strcture.
*/
static rses_property_t* rses_property_init(
rses_property_type_t prop_type)
{
rses_property_t* prop;
prop = (rses_property_t*)calloc(1, sizeof(rses_property_t));
if (prop == NULL)
{
goto return_prop;
}
prop->rses_prop_type = prop_type;
#if defined(SS_DEBUG)
prop->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY;
prop->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY;
#endif
return_prop:
CHK_RSES_PROP(prop);
return prop;
}
/**
* Property is freed at the end of router client session.
*/
static void rses_property_done(
rses_property_t* prop)
{
CHK_RSES_PROP(prop);
switch (prop->rses_prop_type) {
case RSES_PROP_TYPE_SESCMD:
mysql_sescmd_done(&prop->rses_prop_data.sescmd);
break;
default:
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [rses_property_done] Unknown property type %d "
"in property %p",
pthread_self(),
prop->rses_prop_type,
prop)));
ss_dassert(false);
break;
}
free(prop);
}
/**
* Add property to the router_client_ses structure's rses_properties
* array. The slot is determined by the type of property.
* In each slot there is a list of properties of similar type.
*
* Router client session must be locked.
*/
static void rses_property_add(
ROUTER_CLIENT_SES* rses,
rses_property_t* prop)
{
rses_property_t* p;
CHK_CLIENT_RSES(rses);
CHK_RSES_PROP(prop);
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
prop->rses_prop_rsession = rses;
p = rses->rses_properties[prop->rses_prop_type];
if (p == NULL)
{
rses->rses_properties[prop->rses_prop_type] = prop;
}
else
{
while (p->rses_prop_next != NULL)
{
p = p->rses_prop_next;
}
p->rses_prop_next = prop;
}
}
/**
* Router session must be locked.
* Return session command pointer if succeed, NULL if failed.
*/
static mysql_sescmd_t* rses_property_get_sescmd(
rses_property_t* prop)
{
mysql_sescmd_t* sescmd;
CHK_RSES_PROP(prop);
ss_dassert(prop->rses_prop_rsession == NULL ||
SPINLOCK_IS_LOCKED(&prop->rses_prop_rsession->rses_lock));
sescmd = &prop->rses_prop_data.sescmd;
if (sescmd != NULL)
{
CHK_MYSQL_SESCMD(sescmd);
}
return sescmd;
}
/**
static void rses_begin_locked_property_action(
rses_property_t* prop)
{
CHK_RSES_PROP(prop);
spinlock_acquire(&prop->rses_prop_lock);
}
static void rses_end_locked_property_action(
rses_property_t* prop)
{
CHK_RSES_PROP(prop);
spinlock_release(&prop->rses_prop_lock);
}
*/
/**
* Create session command property.
*/
static mysql_sescmd_t* mysql_sescmd_init (
rses_property_t* rses_prop,
GWBUF* sescmd_buf,
unsigned char packet_type,
ROUTER_CLIENT_SES* rses)
{
mysql_sescmd_t* sescmd;
CHK_RSES_PROP(rses_prop);
/** Can't call rses_property_get_sescmd with uninitialized sescmd */
sescmd = &rses_prop->rses_prop_data.sescmd;
sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */
#if defined(SS_DEBUG)
sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD;
sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD;
#endif
/** Set session command buffer */
sescmd->my_sescmd_buf = sescmd_buf;
sescmd->my_sescmd_packet_type = packet_type;
return sescmd;
}
static void mysql_sescmd_done(
mysql_sescmd_t* sescmd)
{
CHK_RSES_PROP(sescmd->my_sescmd_prop);
gwbuf_free(sescmd->my_sescmd_buf);
memset(sescmd, 0, sizeof(mysql_sescmd_t));
}
/**
* All cases where backend message starts at least with one response to session
* command are handled here.
* Read session commands from property list. If command is already replied,
* discard packet. Else send reply to client. In both cases move cursor forward
* until all session command replies are handled.
*
* Cases that are expected to happen and which are handled:
* s = response not yet replied to client, S = already replied response,
* q = query
* 1. q+ for example : select * from mysql.user
* 2. s+ for example : set autocommit=1
* 3. S+
* 4. sq+
* 5. Sq+
* 6. Ss+
* 7. Ss+q+
* 8. S+q+
* 9. s+q+
*/
static GWBUF* sescmd_cursor_process_replies(
DCB* client_dcb,
GWBUF* replybuf,
sescmd_cursor_t* scur)
{
const size_t headerlen = 4; /*< mysql packet header */
uint8_t* packet;
size_t packetlen;
mysql_sescmd_t* scmd;
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
scmd = sescmd_cursor_get_command(scur);
CHK_DCB(client_dcb);
CHK_GWBUF(replybuf);
/**
* Walk through packets in the message and the list of session
*commands.
*/
while (scmd != NULL && replybuf != NULL)
{
if (scmd->my_sescmd_is_replied)
{
/**
* Discard heading packets if their related command is
* already replied.
*/
CHK_GWBUF(replybuf);
packet = (uint8_t *)GWBUF_DATA(replybuf);
packetlen = packet[0]+packet[1]*256+packet[2]*256*256;
replybuf = gwbuf_consume(replybuf, packetlen+headerlen);
/*
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [sescmd_cursor_process_replies] cmd %p "
"is already replied. Discarded %d bytes from "
"the %s replybuffer.",
pthread_self(),
scmd,
packetlen+headerlen,
STRBETYPE(scur->scmd_cur_be_type))));
*/
}
else
{
/** Mark the rest session commands as replied */
scmd->my_sescmd_is_replied = true;
/*
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [sescmd_cursor_process_replies] Marked "
"cmd %p to as replied. Left message to %s's "
"buffer for reply.",
pthread_self(),
scmd,
STRBETYPE(scur->scmd_cur_be_type))));
*/
}
if (sescmd_cursor_next(scur))
{
scmd = sescmd_cursor_get_command(scur);
}
else
{
scmd = NULL;
/** All session commands are replied */
scur->scmd_cur_active = false;
}
}
ss_dassert(replybuf == NULL || *scur->scmd_cur_ptr_property == NULL);
return replybuf;
}
/**
* Get the address of current session command.
*
* Router session must be locked */
static mysql_sescmd_t* sescmd_cursor_get_command(
sescmd_cursor_t* scur)
{
mysql_sescmd_t* scmd;
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
scur->scmd_cur_cmd = rses_property_get_sescmd(*scur->scmd_cur_ptr_property);
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
scmd = scur->scmd_cur_cmd;
return scmd;
}
/** router must be locked */
static bool sescmd_cursor_is_active(
sescmd_cursor_t* sescmd_cursor)
{
bool succp;
ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock));
succp = sescmd_cursor->scmd_cur_active;
return succp;
}
/** router must be locked */
static void sescmd_cursor_set_active(
sescmd_cursor_t* sescmd_cursor,
bool value)
{
ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock));
/** avoid calling unnecessarily */
ss_dassert(sescmd_cursor->scmd_cur_active != value);
sescmd_cursor->scmd_cur_active = value;
}
/**
* Clone session command's command buffer.
* Router session must be locked
*/
static GWBUF* sescmd_cursor_clone_querybuf(
sescmd_cursor_t* scur)
{
GWBUF* buf;
ss_dassert(scur->scmd_cur_cmd != NULL);
buf = gwbuf_clone(scur->scmd_cur_cmd->my_sescmd_buf);
CHK_GWBUF(buf);
return buf;
}
/**
* If session command cursor is passive, sends the command to backend for
* execution.
*
* Returns true if command was sent or added successfully to the queue.
* Returns false if command sending failed or if there are no pending session
* commands.
*
* Router session must be locked.
*/
static bool execute_sescmd_in_backend(
backend_ref_t* backend_ref)
{
DCB* dcb;
bool succp = true;
int rc = 0;
sescmd_cursor_t* scur;
if (backend_ref->bref_dcb == NULL)
{
goto return_succp;
}
dcb = backend_ref->bref_dcb;
CHK_DCB(dcb);
CHK_BACKEND_REF(backend_ref);
/**
* Get cursor pointer and copy of command buffer to cursor.
*/
scur = &backend_ref->bref_sescmd_cur;
/** Return if there are no pending ses commands */
if (sescmd_cursor_get_command(scur) == NULL)
{
succp = false;
goto return_succp;
}
if (!sescmd_cursor_is_active(scur))
{
/** Cursor is left active when function returns. */
sescmd_cursor_set_active(scur, true);
}
LOGIF(LT, tracelog_routed_query(scur->scmd_cur_rses,
"execute_sescmd_in_backend",
backend_ref,
sescmd_cursor_clone_querybuf(scur)));
switch (scur->scmd_cur_cmd->my_sescmd_packet_type) {
case COM_CHANGE_USER:
rc = dcb->func.auth(
dcb,
NULL,
dcb->session,
sescmd_cursor_clone_querybuf(scur));
break;
case COM_QUERY:
case COM_INIT_DB:
default:
rc = dcb->func.write(
dcb,
sescmd_cursor_clone_querybuf(scur));
break;
}
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [execute_sescmd_in_backend] Routed %s cmd %p.",
pthread_self(),
STRPACKETTYPE(scur->scmd_cur_cmd->my_sescmd_packet_type),
scur->scmd_cur_cmd)));
if (rc != 1)
{
succp = false;
}
return_succp:
return succp;
}
/**
* Moves cursor to next property and copied address of its sescmd to cursor.
* Current propery must be non-null.
* If current property is the last on the list, *scur->scmd_ptr_property == NULL
*
* Router session must be locked
*/
static bool sescmd_cursor_next(
sescmd_cursor_t* scur)
{
bool succp = false;
rses_property_t* prop_curr;
rses_property_t* prop_next;
ss_dassert(scur != NULL);
ss_dassert(*(scur->scmd_cur_ptr_property) != NULL);
ss_dassert(SPINLOCK_IS_LOCKED(
&(*(scur->scmd_cur_ptr_property))->rses_prop_rsession->rses_lock));
/** Illegal situation */
if (scur == NULL ||
*scur->scmd_cur_ptr_property == NULL ||
scur->scmd_cur_cmd == NULL)
{
/** Log error */
goto return_succp;
}
prop_curr = *(scur->scmd_cur_ptr_property);
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
ss_dassert(prop_curr == mysql_sescmd_get_property(scur->scmd_cur_cmd));
CHK_RSES_PROP(prop_curr);
/** Copy address of pointer to next property */
scur->scmd_cur_ptr_property = &(prop_curr->rses_prop_next);
prop_next = *scur->scmd_cur_ptr_property;
ss_dassert(prop_next == *(scur->scmd_cur_ptr_property));
/** If there is a next property move forward */
if (prop_next != NULL)
{
CHK_RSES_PROP(prop_next);
CHK_RSES_PROP((*(scur->scmd_cur_ptr_property)));
/** Get pointer to next property's sescmd */
scur->scmd_cur_cmd = rses_property_get_sescmd(prop_next);
ss_dassert(prop_next == scur->scmd_cur_cmd->my_sescmd_prop);
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
CHK_RSES_PROP(scur->scmd_cur_cmd->my_sescmd_prop);
}
else
{
/** No more properties, can't proceed. */
goto return_succp;
}
if (scur->scmd_cur_cmd != NULL)
{
succp = true;
}
else
{
ss_dassert(false); /*< Log error, sescmd shouldn't be NULL */
}
return_succp:
return succp;
}
static rses_property_t* mysql_sescmd_get_property(
mysql_sescmd_t* scmd)
{
CHK_MYSQL_SESCMD(scmd);
return scmd->my_sescmd_prop;
}
static void tracelog_routed_query(
ROUTER_CLIENT_SES* rses,
char* funcname,
backend_ref_t* bref,
GWBUF* buf)
{
uint8_t* packet = GWBUF_DATA(buf);
unsigned char packet_type = packet[4];
size_t len;
size_t buflen = GWBUF_LENGTH(buf);
char* querystr;
char* startpos = (char *)&packet[5];
BACKEND* b;
backend_type_t be_type;
DCB* dcb;
CHK_BACKEND_REF(bref);
b = bref->bref_backend;
CHK_BACKEND(b);
dcb = bref->bref_dcb;
CHK_DCB(dcb);
be_type = BACKEND_TYPE(b);
if (GWBUF_TYPE(buf) == GWBUF_TYPE_MYSQL)
{
len = packet[0];
len += 256*packet[1];
len += 256*256*packet[2];
if (packet_type == '\x03')
{
querystr = (char *)malloc(len);
memcpy(querystr, startpos, len-1);
querystr[len-1] = '\0';
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [%s] %d bytes long buf, \"%s\" -> %s:%d %s dcb %p",
pthread_self(),
funcname,
buflen,
querystr,
b->backend_server->name,
b->backend_server->port,
STRBETYPE(be_type),
dcb)));
free(querystr);
}
}
gwbuf_free(buf);
}
/**
* Return rc, rc < 0 if router session is closed. rc == 0 if there are no
* capabilities specified, rc > 0 when there are capabilities.
*/
static uint8_t getCapabilities (
ROUTER* inst,
void* router_session)
{
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session;
uint8_t rc;
if (!rses_begin_locked_router_action(rses))
{
rc = 0xff;
goto return_rc;
}
rc = rses->rses_capabilities;
rses_end_locked_router_action(rses);
return_rc:
return rc;
}
/**
* Execute in backends used by current router session.
* Save session variable commands to router session property
* struct. Thus, they can be replayed in backends which are
* started and joined later.
*
* Suppress redundant OK packets sent by backends.
*
* The first OK packet is replied to the client.
* Return true if succeed, false is returned if router session was closed or
* if execute_sescmd_in_backend failed.
*/
static bool route_session_write(
ROUTER_CLIENT_SES* router_cli_ses,
GWBUF* querybuf,
ROUTER_INSTANCE* inst,
unsigned char packet_type,
skygw_query_type_t qtype)
{
bool succp;
rses_property_t* prop;
backend_ref_t* backend_ref;
int i;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Session write, query type\t%s, packet type %s, "
"routing to all servers.",
STRQTYPE(qtype),
STRPACKETTYPE(packet_type))));
backend_ref = router_cli_ses->rses_backend_ref;
/**
* COM_QUIT is one-way message. Server doesn't respond to that.
* Therefore reply processing is unnecessary and session
* command property is not needed. It is just routed to both
* backends.
*/
if (packet_type == COM_QUIT)
{
int rc;
succp = true;
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
succp = false;
goto return_succp;
}
for (i=0; i<router_cli_ses->rses_nbackends; i++)
{
DCB* dcb = backend_ref[i].bref_dcb;
if (dcb != NULL)
{
rc = dcb->func.write(dcb, gwbuf_clone(querybuf));
if (rc != 1)
{
succp = false;
}
}
}
rses_end_locked_router_action(router_cli_ses);
gwbuf_free(querybuf);
goto return_succp;
}
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
/**
* Additional reference is created to querybuf to
* prevent it from being released before properties
* are cleaned up as a part of router sessionclean-up.
*/
mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses);
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
rses_property_done(prop);
succp = false;
goto return_succp;
}
/** Add sescmd property to router client session */
rses_property_add(router_cli_ses, prop);
for (i=0; i<router_cli_ses->rses_nbackends; i++)
{
succp = execute_sescmd_in_backend(&backend_ref[i]);
if (!succp)
{
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
goto return_succp;
}
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
atomic_add(&inst->stats.n_all, 1);
return_succp:
return succp;
}
static void rwsplit_process_options(
ROUTER_INSTANCE* router,
char** options)
{
int i;
char* value;
select_criteria_t c;
for (i = 0; options[i]; i++)
{
if ((value = strchr(options[i], '=')) == NULL)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Warning : Unsupported "
"router option \"%s\" for "
"readwritesplit router.",
options[i])));
}
else
{
*value = 0;
value++;
if (strcmp(options[i], "slave_selection_criteria") == 0)
{
c = GET_SELECT_CRITERIA(value);
ss_dassert(
c == LEAST_GLOBAL_CONNECTIONS ||
c == LEAST_ROUTER_CONNECTIONS ||
c == LEAST_BEHIND_MASTER ||
c == UNDEFINED_CRITERIA);
if (c == UNDEFINED_CRITERIA)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Warning : Unknown "
"slave selection criteria \"%s\". "
"Allowed values are \"LEAST_GLOBAL_CONNECTIONS\", "
"LEAST_ROUTER_CONNECTIONS, "
"and \"LEAST_ROUTER_CONNECTIONS\".",
STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria))));
}
else
{
router->rwsplit_config.rw_slave_select_criteria = c;
}
}
}
} /*< for */
}