Files
MaxScale/server/modules/routing/readwritesplit/readwritesplit.c
VilhoRaatikka 7c3a354fd8 Query classifier ignored implicit commits in cases of write commands. Fixed it.
Added more tests for transaction support. Mostly different cases where some command triggers implicit commit in the middle of transaction.
2014-03-28 00:16:18 +02:00

1854 lines
62 KiB
C

/*
* This file is distributed as part of the SkySQL Gateway. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2013
*/
#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <router.h>
#include <readwritesplit.h>
#include <mysql.h>
#include <skygw_utils.h>
#include <log_manager.h>
#include <query_classifier.h>
#include <dcb.h>
#include <spinlock.h>
extern int lm_enabled_logfiles_bitmask;
/**
* @file readwritesplit.c The entry points for the read/write query splitting
* router module.
*
* This file contains the entry points that comprise the API to the read write
* query splitting router.
* @verbatim
* Revision History
*
* Date Who Description
* 01/07/2013 Vilho Raatikka Initial implementation
* 15/07/2013 Massimiliano Pinto Added clientReply
* from master only in case of session change
* 17/07/2013 Massimiliano Pinto clientReply is now used by mysql_backend
* for all reply situations
* 18/07/2013 Massimiliano Pinto routeQuery now handles COM_QUIT
* as QUERY_TYPE_SESSION_WRITE
* 17/07/2014 Massimiliano Pinto Server connection counter is updated in closeSession
*
* @endverbatim
*/
static char *version_str = "V1.0.2";
static ROUTER* createInstance(SERVICE *service, char **options);
static void* newSession(ROUTER *instance, SESSION *session);
static void closeSession(ROUTER *instance, void *session);
static void freeSession(ROUTER *instance, void *session);
static int routeQuery(ROUTER *instance, void *session, GWBUF *queue);
static void diagnostic(ROUTER *instance, DCB *dcb);
static void clientReply(
ROUTER* instance,
void* router_session,
GWBUF* queue,
DCB* backend_dcb);
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
static bool search_backend_servers(
BACKEND** p_master,
BACKEND** p_slave,
ROUTER_INSTANCE* router);
static ROUTER_OBJECT MyObject = {
createInstance,
newSession,
closeSession,
freeSession,
routeQuery,
diagnostic,
clientReply,
NULL,
getCapabilities
};
static bool rses_begin_locked_router_action(
ROUTER_CLIENT_SES* rses);
static void rses_end_locked_router_action(
ROUTER_CLIENT_SES* rses);
static void mysql_sescmd_done(
mysql_sescmd_t* sescmd);
static mysql_sescmd_t* mysql_sescmd_init (
rses_property_t* rses_prop,
GWBUF* sescmd_buf,
unsigned char packet_type,
ROUTER_CLIENT_SES* rses);
static rses_property_t* mysql_sescmd_get_property(
mysql_sescmd_t* scmd);
static rses_property_t* rses_property_init(
rses_property_type_t prop_type);
static void rses_property_add(
ROUTER_CLIENT_SES* rses,
rses_property_t* prop);
static void rses_property_done(
rses_property_t* prop);
static mysql_sescmd_t* rses_property_get_sescmd(
rses_property_t* prop);
static sescmd_cursor_t* rses_get_sescmd_cursor(
ROUTER_CLIENT_SES* rses,
backend_type_t be_type);
static bool execute_sescmd_in_backend(
ROUTER_CLIENT_SES* rses,
backend_type_t be_type);
static void sescmd_cursor_set_active(
sescmd_cursor_t* sescmd_cursor,
bool value);
static bool sescmd_cursor_is_active(
sescmd_cursor_t* sescmd_cursor);
static GWBUF* sescmd_cursor_clone_querybuf(
sescmd_cursor_t* scur);
static mysql_sescmd_t* sescmd_cursor_get_command(
sescmd_cursor_t* scur);
static bool sescmd_cursor_next(
sescmd_cursor_t* scur);
static GWBUF* sescmd_cursor_process_replies(
DCB* client_dcb,
GWBUF* replybuf,
sescmd_cursor_t* scur);
static bool cont_exec_sescmd_in_backend(
ROUTER_CLIENT_SES* rses,
backend_type_t be_type);
static void tracelog_routed_query(
ROUTER_CLIENT_SES* rses,
char* funcname,
DCB* dcb,
GWBUF* buf);
static SPINLOCK instlock;
static ROUTER_INSTANCE* instances;
/**
* Implementation of the mandatory version entry point
*
* @return version string of the module
*/
char *
version()
{
return version_str;
}
/**
* The module initialisation routine, called when the module
* is first loaded.
*/
void
ModuleInit()
{
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"Initializing statemend-based read/write split router module.")));
spinlock_init(&instlock);
instances = NULL;
}
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
* "module object", this is a structure with the set of
* external entry points for this module.
*
* @return The module object
*/
ROUTER_OBJECT* GetModuleObject()
{
return &MyObject;
}
/**
* Create an instance of read/write statemtn router within the MaxScale.
*
*
* @param service The service this router is being create for
* @param options The options for this query router
*
* @return NULL in failure, pointer to router in success.
*/
static ROUTER* createInstance(
SERVICE* service,
char** options)
{
ROUTER_INSTANCE* router;
SERVER* server;
int n;
int i;
if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
return NULL;
}
router->service = service;
spinlock_init(&router->lock);
/** Calculate number of servers */
server = service->databases;
for (n=0; server != NULL; server=server->nextdb) {
n++;
}
router->servers = (BACKEND **)calloc(n + 1, sizeof(BACKEND *));
if (router->servers == NULL)
{
free(router);
return NULL;
}
if (options != NULL)
{
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"Router options supplied to read/write statement router "
"module but none are supported. The options will be "
"ignored.")));
}
/**
* Create an array of the backend servers in the router structure to
* maintain a count of the number of connections to each
* backend server.
*/
server = service->databases;
n = 0;
while (server != NULL) {
if ((router->servers[n] = malloc(sizeof(BACKEND))) == NULL)
{
for (i = 0; i < n; i++) {
free(router->servers[i]);
}
free(router->servers);
free(router);
return NULL;
}
router->servers[n]->backend_server = server;
router->servers[n]->backend_conn_count = 0;
n += 1;
server = server->nextdb;
}
router->servers[n] = NULL;
/**
* vraa : is this necessary for readwritesplit ?
* Option : where can a read go?
* - master (only)
* - slave (only)
* - joined (to both)
*
* Process the options
*/
router->bitmask = 0;
router->bitvalue = 0;
if (options)
{
for (i = 0; options[i]; i++)
{
if (!strcasecmp(options[i], "synced"))
{
router->bitmask |= (SERVER_JOINED);
router->bitvalue |= SERVER_JOINED;
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Warning : Unsupported "
"router option \"%s\" "
"for readwritesplit router.",
options[i])));
}
}
}
/**
* We have completed the creation of the router data, so now
* insert this router into the linked list of routers
* that have been created with this module.
*/
spinlock_acquire(&instlock);
router->next = instances;
instances = router;
spinlock_release(&instlock);
return (ROUTER *)router;
}
/**
* Associate a new session with this instance of the router.
*
* The session is used to store all the data required for a particular
* client connection.
*
* @param instance The router instance data
* @param session The session itself
* @return Session specific data for this session
*/
static void* newSession(
ROUTER* router_inst,
SESSION* session)
{
BACKEND* local_backend[BE_COUNT];
ROUTER_CLIENT_SES* client_rses;
ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst;
bool succp;
client_rses =
(ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES));
if (client_rses == NULL)
{
ss_dassert(false);
return NULL;
}
memset(local_backend, 0, BE_COUNT*sizeof(void*));
spinlock_init(&client_rses->rses_lock);
#if defined(SS_DEBUG)
client_rses->rses_chk_top = CHK_NUM_ROUTER_SES;
client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES;
#endif
/** store pointers to sescmd list to both cursors */
client_rses->rses_cursor[BE_MASTER].scmd_cur_rses = client_rses;
client_rses->rses_cursor[BE_MASTER].scmd_cur_active = false;
client_rses->rses_cursor[BE_MASTER].scmd_cur_ptr_property =
&client_rses->rses_properties[RSES_PROP_TYPE_SESCMD];
client_rses->rses_cursor[BE_MASTER].scmd_cur_cmd = NULL;
client_rses->rses_cursor[BE_MASTER].scmd_cur_be_type = BE_MASTER;
client_rses->rses_cursor[BE_SLAVE].scmd_cur_rses = client_rses;
client_rses->rses_cursor[BE_SLAVE].scmd_cur_active = false;
client_rses->rses_cursor[BE_SLAVE].scmd_cur_ptr_property =
&client_rses->rses_properties[RSES_PROP_TYPE_SESCMD];
client_rses->rses_cursor[BE_SLAVE].scmd_cur_cmd = NULL;
client_rses->rses_cursor[BE_SLAVE].scmd_cur_be_type = BE_SLAVE;
/**
* Find a backend server to connect to. This is the extent of the
* load balancing algorithm we need to implement for this simple
* connection router.
*/
succp = search_backend_servers(&local_backend[BE_MASTER],
&local_backend[BE_SLAVE],
router);
/** Both Master and Slave must be found */
if (!succp) {
free(client_rses);
return NULL;
}
/**
* Open the slave connection.
*/
client_rses->rses_dcb[BE_SLAVE] = dcb_connect(
local_backend[BE_SLAVE]->backend_server,
session,
local_backend[BE_SLAVE]->backend_server->protocol);
if (client_rses->rses_dcb[BE_SLAVE] == NULL) {
ss_dassert(session->refcount == 1);
free(client_rses);
return NULL;
}
/**
* Open the master connection.
*/
client_rses->rses_dcb[BE_MASTER] = dcb_connect(
local_backend[BE_MASTER]->backend_server,
session,
local_backend[BE_MASTER]->backend_server->protocol);
if (client_rses->rses_dcb[BE_MASTER] == NULL)
{
/** Close slave connection first. */
client_rses->rses_dcb[BE_SLAVE]->func.close(client_rses->rses_dcb[BE_SLAVE]);
free(client_rses);
return NULL;
}
/**
* We now have a master and a slave server with the least connections.
* Bump the connection counts for these servers.
*/
atomic_add(&local_backend[BE_SLAVE]->backend_conn_count, 1);
atomic_add(&local_backend[BE_MASTER]->backend_conn_count, 1);
client_rses->rses_backend[BE_SLAVE] = local_backend[BE_SLAVE];
client_rses->rses_backend[BE_MASTER] = local_backend[BE_MASTER];
router->stats.n_sessions += 1;
client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT;
/**
* Version is bigger than zero once initialized.
*/
atomic_add(&client_rses->rses_versno, 2);
ss_dassert(client_rses->rses_versno == 2);
/**
* Add this session to end of the list of active sessions in router.
*/
spinlock_acquire(&router->lock);
client_rses->next = router->connections;
router->connections = client_rses;
spinlock_release(&router->lock);
CHK_CLIENT_RSES(client_rses);
return (void *)client_rses;
}
/**
* Close a session with the router, this is the mechanism
* by which a router may cleanup data structure etc.
*
* @param instance The router instance data
* @param session The session being closed
*/
static void closeSession(
ROUTER* instance,
void* router_session)
{
ROUTER_CLIENT_SES* router_cli_ses;
DCB* slave_dcb;
DCB* master_dcb;
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
CHK_CLIENT_RSES(router_cli_ses);
/**
* Lock router client session for secure read and update.
*/
if (rses_begin_locked_router_action(router_cli_ses))
{
/** decrease server current connection counters */
atomic_add(&router_cli_ses->rses_backend[BE_SLAVE]->backend_server->stats.n_current, -1);
atomic_add(&router_cli_ses->rses_backend[BE_MASTER]->backend_server->stats.n_current, -1);
slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE];
router_cli_ses->rses_dcb[BE_SLAVE] = NULL;
master_dcb = router_cli_ses->rses_dcb[BE_MASTER];
router_cli_ses->rses_dcb[BE_MASTER] = NULL;
router_cli_ses->rses_closed = true;
/** Unlock */
rses_end_locked_router_action(router_cli_ses);
/**
* Close the backend server connections
*/
if (slave_dcb != NULL) {
CHK_DCB(slave_dcb);
slave_dcb->func.close(slave_dcb);
}
if (master_dcb != NULL) {
master_dcb->func.close(master_dcb);
CHK_DCB(master_dcb);
}
}
}
static void freeSession(
ROUTER* router_instance,
void* router_client_session)
{
ROUTER_CLIENT_SES* router_cli_ses;
ROUTER_INSTANCE* router;
int i;
router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session;
router = (ROUTER_INSTANCE *)router_instance;
atomic_add(&router_cli_ses->rses_backend[BE_SLAVE]->backend_conn_count, -1);
atomic_add(&router_cli_ses->rses_backend[BE_MASTER]->backend_conn_count, -1);
spinlock_acquire(&router->lock);
if (router->connections == router_cli_ses) {
router->connections = router_cli_ses->next;
} else {
ROUTER_CLIENT_SES* ptr = router->connections;
while (ptr && ptr->next != router_cli_ses) {
ptr = ptr->next;
}
if (ptr) {
ptr->next = router_cli_ses->next;
}
}
spinlock_release(&router->lock);
/**
* For each property type, walk through the list, finalize properties
* and free the allocated memory.
*/
for (i=RSES_PROP_TYPE_FIRST; i<RSES_PROP_TYPE_COUNT; i++)
{
rses_property_t* p = router_cli_ses->rses_properties[i];
rses_property_t* q = p;
while (p != NULL)
{
q = p->rses_prop_next;
rses_property_done(p);
p = q;
}
}
/*
* We are no longer in the linked list, free
* all the memory and other resources associated
* to the client session.
*/
free(router_cli_ses);
return;
}
/**
* The main routing entry, this is called with every packet that is
* received and has to be forwarded to the backend database.
*
* The routeQuery will make the routing decision based on the contents
* of the instance, session and the query itself in the queue. The
* data in the queue may not represent a complete query, it represents
* the data that has been received. The query router itself is responsible
* for buffering the partial query, a later call to the query router will
* contain the remainder, or part thereof of the query.
*
* @param instance The query router instance
* @param session The session associated with the client
* @param queue Gateway buffer queue with the packets received
*
* @return The number of queries forwarded
*/
static int routeQuery(
ROUTER* instance,
void* router_session,
GWBUF* querybuf)
{
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
GWBUF* plainsqlbuf = NULL;
char* querystr = NULL;
char* startpos;
unsigned char packet_type;
uint8_t* packet;
int ret = 0;
DCB* master_dcb = NULL;
DCB* slave_dcb = NULL;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
bool rses_is_closed;
rses_property_t* prop;
size_t len;
static bool transaction_active;
CHK_CLIENT_RSES(router_cli_ses);
/** Dirty read for quick check if router is closed. */
if (router_cli_ses->rses_closed)
{
rses_is_closed = true;
}
else
{
/*< Lock router client session for secure read of DCBs */
rses_is_closed =
!(rses_begin_locked_router_action(router_cli_ses));
}
if (!rses_is_closed)
{
master_dcb = router_cli_ses->rses_dcb[BE_MASTER];
slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE];
/** unlock */
rses_end_locked_router_action(router_cli_ses);
}
packet = GWBUF_DATA(querybuf);
packet_type = packet[4];
if (rses_is_closed || (master_dcb == NULL && slave_dcb == NULL))
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error: Failed to route %s:%s:\"%s\" to "
"backend server. %s.",
STRPACKETTYPE(packet_type),
STRQTYPE(qtype),
(querystr == NULL ? "(empty)" : querystr),
(rses_is_closed ? "Router was closed" :
"Router has no backend servers where to "
"route to"))));
goto return_ret;
}
inst->stats.n_queries++;
startpos = (char *)&packet[5];
switch(packet_type) {
case COM_QUIT: /**< 1 QUIT will close all sessions */
case COM_INIT_DB: /**< 2 DDL must go to the master */
case COM_REFRESH: /**< 7 - I guess this is session but not sure */
case COM_DEBUG: /**< 0d all servers dump debug info to stdout */
case COM_PING: /**< 0e all servers are pinged */
case COM_CHANGE_USER: /**< 11 all servers change it accordingly */
qtype = QUERY_TYPE_SESSION_WRITE;
break;
case COM_CREATE_DB: /**< 5 DDL must go to the master */
case COM_DROP_DB: /**< 6 DDL must go to the master */
qtype = QUERY_TYPE_WRITE;
break;
case COM_QUERY:
plainsqlbuf = gwbuf_clone_transform(querybuf,
GWBUF_TYPE_PLAINSQL);
len = GWBUF_LENGTH(plainsqlbuf);
/** unnecessary if buffer includes additional terminating null */
querystr = (char *)malloc(len+1);
memcpy(querystr, startpos, len);
memset(&querystr[len], 0, 1);
// querystr = (char *)GWBUF_DATA(plainsqlbuf);
/*
querystr = master_dcb->func.getquerystr(
(void *) gwbuf_clone(querybuf),
&querystr_is_copy);
*/
qtype = skygw_query_classifier_get_type(querystr, 0);
break;
case COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
case COM_STATISTICS: /**< 9 ? */
case COM_PROCESS_INFO: /**< 0a ? */
case COM_CONNECT: /**< 0b ? */
case COM_PROCESS_KILL: /**< 0c ? */
case COM_TIME: /**< 0f should this be run in gateway ? */
case COM_DELAYED_INSERT: /**< 10 ? */
case COM_DAEMON: /**< 1d ? */
default:
break;
} /**< switch by packet type */
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"String\t\"%s\"",
querystr == NULL ? "(empty)" : querystr)));
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"Packet type\t%s",
STRPACKETTYPE(packet_type))));
if (QUERY_IS_TYPE(qtype,QUERY_TYPE_COMMIT) &&
transaction_active)
{
transaction_active = false;
}
switch (qtype) {
case QUERY_TYPE_WRITE:
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Query type\t%s, "
"routing to Master.",
pthread_self(),
STRQTYPE(qtype))));
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
master_dcb,
gwbuf_clone(querybuf)));
ret = master_dcb->func.write(master_dcb, querybuf);
atomic_add(&inst->stats.n_master, 1);
goto return_ret;
break;
case QUERY_TYPE_READ:
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Query type\t%s, "
"routing to %s.",
pthread_self(),
STRQTYPE(qtype),
(transaction_active ? "Master" : "Slave"))));
LOGIF(LT, tracelog_routed_query(
router_cli_ses,
"routeQuery",
(transaction_active ? master_dcb : slave_dcb),
gwbuf_clone(querybuf)));
if (transaction_active)
{
ret = master_dcb->func.write(master_dcb, querybuf);
}
else
{
ret = slave_dcb->func.write(slave_dcb, querybuf);
}
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Routed.",
pthread_self())));
atomic_add(&inst->stats.n_slave, 1);
goto return_ret;
break;
case QUERY_TYPE_SESSION_WRITE:
/**
* Execute in backends used by current router session.
* Save session variable commands to router session property
* struct. Thus, they can be replayed in backends which are
* started and joined later.
*
* Suppress redundant OK packets sent by backends.
*
* DOES THIS ALL APPLY TO COM_QUIT AS WELL??
*
* The first OK packet is replied to the client.
*
*/
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] DCB M:%p s:%p, "
"Query type\t%s, "
"packet type %s, routing to all servers.",
pthread_self(),
master_dcb,
slave_dcb,
STRQTYPE(qtype),
STRPACKETTYPE(packet_type))));
/**
* COM_QUIT is one-way message. Server doesn't respond to that.
* Therefore reply processing is unnecessary and session
* command property is not needed. It is just routed to both
* backends.
*/
if (packet_type == COM_QUIT)
{
int rc;
int rc2;
rc = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf));
rc2 = slave_dcb->func.write(slave_dcb, querybuf);
if (rc == 1 && rc == rc2)
{
ret = 1;
}
goto return_ret;
}
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
/**
* Additional reference is created to querybuf to
* prevent it from being released before properties
* are cleaned up as a part of router sessionclean-up.
*/
mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses);
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
rses_property_done(prop);
goto return_ret;
}
/** Add sescmd property to router client session */
rses_property_add(router_cli_ses, prop);
/** Execute session command in master */
if (execute_sescmd_in_backend(router_cli_ses, BE_MASTER))
{
ret = 1;
}
else
{
/** Log error */
}
/** Execute session command in slave */
if (execute_sescmd_in_backend(router_cli_ses, BE_SLAVE))
{
ret = 1;
}
else
{
/** Log error */
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
atomic_add(&inst->stats.n_all, 1);
goto return_ret;
break;
case QUERY_TYPE_BEGIN_TRX:
transaction_active = true;
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
master_dcb,
gwbuf_clone(querybuf)));
ret = master_dcb->func.write(master_dcb, querybuf);
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Routed.",
pthread_self())));
atomic_add(&inst->stats.n_master, 1);
goto return_ret;
break;
case QUERY_TYPE_COMMIT:
case QUERY_TYPE_ROLLBACK:
transaction_active = false;
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
master_dcb,
gwbuf_clone(querybuf)));
ret = master_dcb->func.write(master_dcb, querybuf);
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Routed.",
pthread_self())));
atomic_add(&inst->stats.n_master, 1);
goto return_ret;
break;
default:
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Query type\t%s, "
"routing to Master by default.",
pthread_self(),
STRQTYPE(qtype))));
/**
* Is this really ok?
* What is not known is routed to master.
*/
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
master_dcb,
gwbuf_clone(querybuf)));
ret = master_dcb->func.write(master_dcb, querybuf);
atomic_add(&inst->stats.n_master, 1);
goto return_ret;
break;
} /*< switch by query type */
return_ret:
if (plainsqlbuf != NULL)
{
gwbuf_free(plainsqlbuf);
}
if (querystr != NULL)
{
free(querystr);
}
return ret;
}
/** to be inline'd */
/**
* @node Acquires lock to router client session if it is not closed.
*
* Parameters:
* @param rses - in, use
*
*
* @return true if router session was not closed. If return value is true
* it means that router is locked, and must be unlocked later. False, if
* router was closed before lock was acquired.
*
*
* @details (write detailed description here)
*
*/
static bool rses_begin_locked_router_action(
ROUTER_CLIENT_SES* rses)
{
bool succp = false;
CHK_CLIENT_RSES(rses);
if (rses->rses_closed) {
goto return_succp;
}
spinlock_acquire(&rses->rses_lock);
if (rses->rses_closed) {
spinlock_release(&rses->rses_lock);
goto return_succp;
}
succp = true;
return_succp:
if (!succp)
{
/** log that router session was closed */
}
return succp;
}
/** to be inline'd */
/**
* @node Releases router client session lock.
*
* Parameters:
* @param rses - <usage>
* <description>
*
* @return void
*
*
* @details (write detailed description here)
*
*/
static void rses_end_locked_router_action(
ROUTER_CLIENT_SES* rses)
{
CHK_CLIENT_RSES(rses);
spinlock_release(&rses->rses_lock);
}
/**
* Diagnostics routine
*
* Print query router statistics to the DCB passed in
*
* @param instance The router instance
* @param dcb The DCB for diagnostic output
*/
static void
diagnostic(ROUTER *instance, DCB *dcb)
{
ROUTER_CLIENT_SES *router_cli_ses;
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
int i = 0;
spinlock_acquire(&router->lock);
router_cli_ses = router->connections;
while (router_cli_ses)
{
i++;
router_cli_ses = router_cli_ses->next;
}
spinlock_release(&router->lock);
dcb_printf(dcb,
"\tNumber of router sessions: %d\n",
router->stats.n_sessions);
dcb_printf(dcb,
"\tCurrent no. of router sessions: %d\n",
i);
dcb_printf(dcb,
"\tNumber of queries forwarded: %d\n",
router->stats.n_queries);
dcb_printf(dcb,
"\tNumber of queries forwarded to master: %d\n",
router->stats.n_master);
dcb_printf(dcb,
"\tNumber of queries forwarded to slave: %d\n",
router->stats.n_slave);
dcb_printf(dcb,
"\tNumber of queries forwarded to all: %d\n",
router->stats.n_all);
}
/**
* Client Reply routine
*
* The routine will reply to client for session change with master server data
*
* @param instance The router instance
* @param router_session The router session
* @param backend_dcb The backend DCB
* @param queue The GWBUF with reply data
*/
static void clientReply(
ROUTER* instance,
void* router_session,
GWBUF* writebuf,
DCB* backend_dcb)
{
DCB* master_dcb;
DCB* slave_dcb;
DCB* client_dcb;
ROUTER_CLIENT_SES* router_cli_ses;
sescmd_cursor_t* scur = NULL;
backend_type_t be_type = BE_UNDEFINED;
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
CHK_CLIENT_RSES(router_cli_ses);
/**
* Lock router client session for secure read of router session members.
* Note that this could be done without lock by using version #
*/
if (!rses_begin_locked_router_action(router_cli_ses))
{
while ((writebuf = gwbuf_consume(
writebuf,
GWBUF_LENGTH(writebuf))) != NULL);
goto lock_failed;
}
master_dcb = router_cli_ses->rses_dcb[BE_MASTER];
slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE];
/** Holding lock ensures that router session remains open */
ss_dassert(backend_dcb->session != NULL);
client_dcb = backend_dcb->session->client;
/** Unlock */
rses_end_locked_router_action(router_cli_ses);
/**
* 1. Check if backend received reply to sescmd.
* 2. Check sescmd's state whether OK_PACKET has been
* sent to client already and if not, lock property cursor,
* reply to client, and move property cursor forward. Finally
* release the lock.
* 3. If reply for this sescmd is sent, lock property cursor
* and
*/
if (client_dcb == NULL)
{
while ((writebuf = gwbuf_consume(
writebuf,
GWBUF_LENGTH(writebuf))) != NULL);
/** Log that client was closed before reply */
return;
}
if (backend_dcb == master_dcb)
{
be_type = BE_MASTER;
}
else if (backend_dcb == slave_dcb)
{
be_type = BE_SLAVE;
}
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"reply_by_statement",
backend_dcb,
gwbuf_clone(writebuf)));
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
/** Log to debug that router was closed */
goto lock_failed;
}
scur = rses_get_sescmd_cursor(router_cli_ses, be_type);
/**
* Active cursor means that reply is from session command
* execution. Majority of the time there are no session commands
* being executed.
*/
if (sescmd_cursor_is_active(scur))
{
writebuf = sescmd_cursor_process_replies(client_dcb,
writebuf,
scur);
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
if (writebuf != NULL && client_dcb != NULL)
{
/** Write reply to client DCB */
client_dcb->func.write(client_dcb, writebuf);
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [clientReply:rwsplit] client dcb %p, "
"backend dcb %p. End of normal reply.",
pthread_self(),
client_dcb,
backend_dcb)));
}
lock_failed:
return;
}
/**
* @node Search suitable backend server from those of router instance.
*
* Parameters:
* @param p_master - in, use, out
* Pointer to location where master's address is to be stored.
* If NULL, then master is not searched.
*
* @param p_slave - in, use, out
* Pointer to location where slave's address is to be stored.
* if NULL, then slave is not searched.
*
* @param inst - in, use
* Pointer to router instance
*
* @return true, if all what what requested found, false if the request
* was not satisfied or was partially satisfied.
*
*
* @details It is assumed that there is only one master among servers of
* a router instance. As a result, thr first master is always chosen.
*/
static bool search_backend_servers(
BACKEND** p_master,
BACKEND** p_slave,
ROUTER_INSTANCE* router)
{
BACKEND* local_backend[BE_COUNT] = {NULL,NULL};
int i;
bool succp = true;
/*
* Loop over all the servers and find any that have fewer connections
* than current candidate server.
*
* If a server has less connections than the current candidate it is
* chosen to a new candidate.
*
* If a server has the same number of connections currently as the
* candidate and has had less connections over time than the candidate
* it will also become the new candidate. This has the effect of
* spreading the connections over different servers during periods of
* very low load.
*
* If master is searched for, the first master found is chosen.
*/
for (i = 0; router->servers[i] != NULL; i++) {
BACKEND* be = router->servers[i];
if (be != NULL) {
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [search_backend_servers] Examine server "
"%s:%d with %d connections. Status is %d, "
"router->bitvalue is %d",
pthread_self(),
be->backend_server->name,
be->backend_server->port,
be->backend_conn_count,
be->backend_server->status,
router->bitmask)));
}
if (be != NULL &&
SERVER_IS_RUNNING(be->backend_server) &&
(be->backend_server->status & router->bitmask) ==
router->bitvalue)
{
if (SERVER_IS_SLAVE(be->backend_server) &&
p_slave != NULL)
{
/**
* If no candidate set, set first running
* server as an initial candidate server.
*/
if (local_backend[BE_SLAVE] == NULL)
{
local_backend[BE_SLAVE] = be;
}
else if (be->backend_conn_count <
local_backend[BE_SLAVE]->backend_conn_count)
{
/**
* This running server has fewer
* connections, set it as a new
* candidate.
*/
local_backend[BE_SLAVE] = be;
}
else if (be->backend_conn_count ==
local_backend[BE_SLAVE]->backend_conn_count &&
be->backend_server->stats.n_connections <
local_backend[BE_SLAVE]->backend_server->stats.n_connections)
{
/**
* This running server has the same
* number of connections currently
* as the candidate but has had
* fewer connections over time
* than candidate, set this server
* to candidate.
*/
local_backend[BE_SLAVE] = be;
}
}
else if (p_master != NULL &&
local_backend[BE_MASTER] == NULL &&
SERVER_IS_MASTER(be->backend_server))
{
local_backend[BE_MASTER] = be;
}
else if (p_master != NULL &&
local_backend[BE_JOINED] == NULL &&
SERVER_IS_JOINED(be->backend_server))
{
local_backend[BE_JOINED] = be;
}
}
}
if (router->bitvalue != 0 &&
p_master != NULL &&
local_backend[BE_JOINED] == NULL)
{
succp = false;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Couldn't find a Joined Galera node from %d "
"candidates.",
i)));
goto return_succp;
}
if (p_slave != NULL && local_backend[BE_SLAVE] == NULL) {
succp = false;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Couldn't find suitable Slave from %d "
"candidates.",
i)));
}
if (p_master != NULL && local_backend[BE_MASTER] == NULL) {
succp = false;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Couldn't find suitable Master from %d "
"candidates.",
i)));
}
if (local_backend[BE_SLAVE] != NULL) {
*p_slave = local_backend[BE_SLAVE];
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [readwritesplit:search_backend_servers] Selected "
"Slave %s:%d from %d candidates.",
pthread_self(),
local_backend[BE_SLAVE]->backend_server->name,
local_backend[BE_SLAVE]->backend_server->port,
i)));
}
if (local_backend[BE_MASTER] != NULL) {
*p_master = local_backend[BE_MASTER];
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [readwritesplit:search_backend_servers] Selected "
"Master %s:%d "
"from %d candidates.",
pthread_self(),
local_backend[BE_MASTER]->backend_server->name,
local_backend[BE_MASTER]->backend_server->port,
i)));
}
return_succp:
return succp;
}
/**
* Create a generic router session property strcture.
*/
static rses_property_t* rses_property_init(
rses_property_type_t prop_type)
{
rses_property_t* prop;
prop = (rses_property_t*)calloc(1, sizeof(rses_property_t));
if (prop == NULL)
{
goto return_prop;
}
prop->rses_prop_type = prop_type;
#if defined(SS_DEBUG)
prop->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY;
prop->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY;
#endif
return_prop:
CHK_RSES_PROP(prop);
return prop;
}
/**
* Property is freed at the end of router client session.
*/
static void rses_property_done(
rses_property_t* prop)
{
CHK_RSES_PROP(prop);
switch (prop->rses_prop_type) {
case RSES_PROP_TYPE_SESCMD:
mysql_sescmd_done(&prop->rses_prop_data.sescmd);
break;
default:
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [rses_property_done] Unknown property type %d "
"in property %p",
pthread_self(),
prop->rses_prop_type,
prop)));
ss_dassert(false);
break;
}
free(prop);
}
/**
* Add property to the router_client_ses structure's rses_properties
* array. The slot is determined by the type of property.
* In each slot there is a list of properties of similar type.
*
* Router client session must be locked.
*/
static void rses_property_add(
ROUTER_CLIENT_SES* rses,
rses_property_t* prop)
{
rses_property_t* p;
CHK_CLIENT_RSES(rses);
CHK_RSES_PROP(prop);
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
prop->rses_prop_rsession = rses;
p = rses->rses_properties[prop->rses_prop_type];
if (p == NULL)
{
rses->rses_properties[prop->rses_prop_type] = prop;
}
else
{
while (p->rses_prop_next != NULL)
{
p = p->rses_prop_next;
}
p->rses_prop_next = prop;
}
}
/**
* Router session must be locked.
* Return session command pointer if succeed, NULL if failed.
*/
static mysql_sescmd_t* rses_property_get_sescmd(
rses_property_t* prop)
{
mysql_sescmd_t* sescmd;
CHK_RSES_PROP(prop);
ss_dassert(prop->rses_prop_rsession == NULL ||
SPINLOCK_IS_LOCKED(&prop->rses_prop_rsession->rses_lock));
sescmd = &prop->rses_prop_data.sescmd;
if (sescmd != NULL)
{
CHK_MYSQL_SESCMD(sescmd);
}
return sescmd;
}
/**
static void rses_begin_locked_property_action(
rses_property_t* prop)
{
CHK_RSES_PROP(prop);
spinlock_acquire(&prop->rses_prop_lock);
}
static void rses_end_locked_property_action(
rses_property_t* prop)
{
CHK_RSES_PROP(prop);
spinlock_release(&prop->rses_prop_lock);
}
*/
/**
* Create session command property.
*/
static mysql_sescmd_t* mysql_sescmd_init (
rses_property_t* rses_prop,
GWBUF* sescmd_buf,
unsigned char packet_type,
ROUTER_CLIENT_SES* rses)
{
mysql_sescmd_t* sescmd;
CHK_RSES_PROP(rses_prop);
/** Can't call rses_property_get_sescmd with uninitialized sescmd */
sescmd = &rses_prop->rses_prop_data.sescmd;
sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */
#if defined(SS_DEBUG)
sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD;
sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD;
#endif
/** Set session command buffer */
sescmd->my_sescmd_buf = sescmd_buf;
sescmd->my_sescmd_packet_type = packet_type;
return sescmd;
}
static void mysql_sescmd_done(
mysql_sescmd_t* sescmd)
{
CHK_RSES_PROP(sescmd->my_sescmd_prop);
gwbuf_free(sescmd->my_sescmd_buf);
memset(sescmd, 0, sizeof(mysql_sescmd_t));
}
/**
* All cases where backend message starts at least with one response to session
* command are handled here.
* Read session commands from property list. If command is already replied,
* discard packet. Else send reply to client. In both cases move cursor forward
* until all session command replies are handled.
*
* Cases that are expected to happen and which are handled:
* s = response not yet replied to client, S = already replied response,
* q = query
* 1. q+ for example : select * from mysql.user
* 2. s+ for example : set autocommit=1
* 3. S+
* 4. sq+
* 5. Sq+
* 6. Ss+
* 7. Ss+q+
* 8. S+q+
* 9. s+q+
*/
static GWBUF* sescmd_cursor_process_replies(
DCB* client_dcb,
GWBUF* replybuf,
sescmd_cursor_t* scur)
{
const size_t headerlen = 4; /*< mysql packet header */
uint8_t* packet;
size_t packetlen;
mysql_sescmd_t* scmd;
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
scmd = sescmd_cursor_get_command(scur);
CHK_DCB(client_dcb);
CHK_GWBUF(replybuf);
/**
* Walk through packets in the message and the list of session
*commands.
*/
while (scmd != NULL && replybuf != NULL)
{
if (scmd->my_sescmd_is_replied)
{
/**
* Discard heading packets if their related command is
* already replied.
*/
CHK_GWBUF(replybuf);
packet = (uint8_t *)GWBUF_DATA(replybuf);
packetlen = packet[0]+packet[1]*256+packet[2]*256*256;
replybuf = gwbuf_consume(replybuf, packetlen+headerlen);
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [sescmd_cursor_process_replies] cmd %p "
"is already replied. Discarded %d bytes from "
"the %s replybuffer.",
pthread_self(),
scmd,
packetlen+headerlen,
STRBETYPE(scur->scmd_cur_be_type))));
}
else
{
/** Mark the rest session commands as replied */
scmd->my_sescmd_is_replied = true;
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [sescmd_cursor_process_replies] Marked "
"cmd %p to as replied. Left message to %s's "
"buffer for reply.",
pthread_self(),
scmd,
STRBETYPE(scur->scmd_cur_be_type))));
}
if (sescmd_cursor_next(scur))
{
scmd = sescmd_cursor_get_command(scur);
}
else
{
scmd = NULL;
/** All session commands are replied */
scur->scmd_cur_active = false;
}
}
ss_dassert(replybuf == NULL || *scur->scmd_cur_ptr_property == NULL);
return replybuf;
}
/**
* Get the address of current session command.
*
* Router session must be locked */
static mysql_sescmd_t* sescmd_cursor_get_command(
sescmd_cursor_t* scur)
{
mysql_sescmd_t* scmd;
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
scur->scmd_cur_cmd = rses_property_get_sescmd(*scur->scmd_cur_ptr_property);
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
scmd = scur->scmd_cur_cmd;
return scmd;
}
/** router must be locked */
static sescmd_cursor_t* rses_get_sescmd_cursor(
ROUTER_CLIENT_SES* rses,
backend_type_t be_type)
{
CHK_CLIENT_RSES(rses);
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
return &rses->rses_cursor[be_type];
}
/** router must be locked */
static bool sescmd_cursor_is_active(
sescmd_cursor_t* sescmd_cursor)
{
bool succp;
ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock));
succp = sescmd_cursor->scmd_cur_active;
return succp;
}
/** router must be locked */
static void sescmd_cursor_set_active(
sescmd_cursor_t* sescmd_cursor,
bool value)
{
ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock));
/** avoid calling unnecessarily */
ss_dassert(sescmd_cursor->scmd_cur_active != value);
sescmd_cursor->scmd_cur_active = value;
}
/**
* Clone session command's command buffer.
* Router session must be locked
*/
static GWBUF* sescmd_cursor_clone_querybuf(
sescmd_cursor_t* scur)
{
GWBUF* buf;
ss_dassert(scur->scmd_cur_cmd != NULL);
buf = gwbuf_clone(scur->scmd_cur_cmd->my_sescmd_buf);
CHK_GWBUF(buf);
return buf;
}
/**
* If session command cursor is passive, sends the command to backend for
* execution.
*
* Returns true if command was sent or added successfully to the queue.
* Returns false if command sending failed or if there are no pending session
* commands.
*
* Router session must be locked.
*/
static bool execute_sescmd_in_backend(
ROUTER_CLIENT_SES* rses,
backend_type_t be_type)
{
DCB* dcb;
bool succp = true;
int rc = 0;
sescmd_cursor_t* scur;
dcb = rses->rses_dcb[be_type];
CHK_DCB(dcb);
CHK_CLIENT_RSES(rses);
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
/**
* Get cursor pointer and copy of command buffer to cursor.
*/
scur = rses_get_sescmd_cursor(rses, be_type);
/** Return if there are no pending ses commands */
if (sescmd_cursor_get_command(scur) == NULL)
{
succp = false;
goto return_succp;
}
if (!sescmd_cursor_is_active(scur))
{
/** Cursor is left active when function returns. */
sescmd_cursor_set_active(scur, true);
}
LOGIF(LT, tracelog_routed_query(rses,
"execute_sescmd_in_backend",
dcb,
sescmd_cursor_clone_querybuf(scur)));
switch (scur->scmd_cur_cmd->my_sescmd_packet_type) {
case COM_CHANGE_USER:
rc = dcb->func.auth(
dcb,
NULL,
dcb->session,
sescmd_cursor_clone_querybuf(scur));
break;
case COM_QUIT:
case COM_QUERY:
case COM_INIT_DB:
default:
rc = dcb->func.write(
dcb,
sescmd_cursor_clone_querybuf(scur));
break;
}
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [execute_sescmd_in_backend] Routed %s cmd %p.",
pthread_self(),
STRPACKETTYPE(scur->scmd_cur_cmd->my_sescmd_packet_type),
scur->scmd_cur_cmd)));
if (rc != 1)
{
succp = false;
}
return_succp:
return succp;
}
/**
* Moves cursor to next property and copied address of its sescmd to cursor.
* Current propery must be non-null.
* If current property is the last on the list, *scur->scmd_ptr_property == NULL
*
* Router session must be locked
*/
static bool sescmd_cursor_next(
sescmd_cursor_t* scur)
{
bool succp = false;
rses_property_t* prop_curr;
rses_property_t* prop_next;
ss_dassert(scur != NULL);
ss_dassert(*(scur->scmd_cur_ptr_property) != NULL);
ss_dassert(SPINLOCK_IS_LOCKED(
&(*(scur->scmd_cur_ptr_property))->rses_prop_rsession->rses_lock));
/** Illegal situation */
if (scur == NULL ||
*scur->scmd_cur_ptr_property == NULL ||
scur->scmd_cur_cmd == NULL)
{
/** Log error */
goto return_succp;
}
prop_curr = *(scur->scmd_cur_ptr_property);
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
ss_dassert(prop_curr == mysql_sescmd_get_property(scur->scmd_cur_cmd));
CHK_RSES_PROP(prop_curr);
/** Copy address of pointer to next property */
scur->scmd_cur_ptr_property = &(prop_curr->rses_prop_next);
prop_next = *scur->scmd_cur_ptr_property;
ss_dassert(prop_next == *(scur->scmd_cur_ptr_property));
/** If there is a next property move forward */
if (prop_next != NULL)
{
CHK_RSES_PROP(prop_next);
CHK_RSES_PROP((*(scur->scmd_cur_ptr_property)));
/** Get pointer to next property's sescmd */
scur->scmd_cur_cmd = rses_property_get_sescmd(prop_next);
ss_dassert(prop_next == scur->scmd_cur_cmd->my_sescmd_prop);
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
CHK_RSES_PROP(scur->scmd_cur_cmd->my_sescmd_prop);
}
else
{
/** No more properties, can't proceed. */
goto return_succp;
}
if (scur->scmd_cur_cmd != NULL)
{
succp = true;
}
else
{
ss_dassert(false); /*< Log error, sescmd shouldn't be NULL */
}
return_succp:
return succp;
}
static rses_property_t* mysql_sescmd_get_property(
mysql_sescmd_t* scmd)
{
CHK_MYSQL_SESCMD(scmd);
return scmd->my_sescmd_prop;
}
static void tracelog_routed_query(
ROUTER_CLIENT_SES* rses,
char* funcname,
DCB* dcb,
GWBUF* buf)
{
uint8_t* packet = GWBUF_DATA(buf);
unsigned char packet_type = packet[4];
size_t len;
size_t buflen = GWBUF_LENGTH(buf);
char* querystr;
char* startpos = (char *)&packet[5];
backend_type_t be_type;
if (rses->rses_dcb[BE_MASTER] == dcb)
{
be_type = BE_MASTER;
}
else if (rses->rses_dcb[BE_SLAVE] == dcb)
{
be_type = BE_SLAVE;
}
else
{
be_type = BE_UNDEFINED;
}
if (GWBUF_TYPE(buf) == GWBUF_TYPE_MYSQL)
{
len = packet[0];
len += 256*packet[1];
len += 256*256*packet[2];
if (packet_type == '\x03')
{
querystr = (char *)malloc(len);
memcpy(querystr, startpos, len-1);
querystr[len-1] = '\0';
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [%s] %d bytes long buf, \"%s\" -> %s:%d %s dcb %p",
pthread_self(),
funcname,
buflen,
querystr,
(be_type == BE_MASTER ?
rses->rses_backend[BE_MASTER]->backend_server->name :
(be_type == BE_SLAVE ?
rses->rses_backend[BE_SLAVE]->backend_server->name :
"Target DCB is neither of the backends. This is error")),
(be_type == BE_MASTER ?
rses->rses_backend[BE_MASTER]->backend_server->port :
(be_type == BE_SLAVE ?
rses->rses_backend[BE_SLAVE]->backend_server->port :
-1)),
STRBETYPE(be_type),
dcb)));
}
}
gwbuf_free(buf);
}
/**
* Return rc, rc < 0 if router session is closed. rc == 0 if there are no
* capabilities specified, rc > 0 when there are capabilities.
*/
static uint8_t getCapabilities (
ROUTER* inst,
void* router_session)
{
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session;
uint8_t rc;
if (!rses_begin_locked_router_action(rses))
{
rc = 0xff;
goto return_rc;
}
rc = rses->rses_capabilities;
rses_end_locked_router_action(rses);
return_rc:
return rc;
}