Implementation of shared buffer level in the gwbuf so that one set of data can be kept with

different offsets for different gwbufs

Updated monitor to better handle maste/slave replication environments

Split MASTER and SERVER bits in the erver bitmask so that we canhave more states, MASTER, SLAVE and NOT IN REPLICATION

Updates to the read write splitter for routing commands to all nodes, diagnostic output and fixes to the algorithm to pick up a master and slave connection
This commit is contained in:
Mark Riddoch 2013-07-11 18:37:43 +02:00
parent aea8af08ba
commit 619af90696
9 changed files with 243 additions and 135 deletions

View File

@ -29,11 +29,13 @@
*
* Date Who Description
* 10/06/13 Mark Riddoch Initial implementation
* 11/07/13 Mark Riddoch Add reference count mechanism
*
* @endverbatim
*/
#include <stdlib.h>
#include <buffer.h>
#include <atomic.h>
/**
@ -50,7 +52,8 @@
GWBUF *
gwbuf_alloc(unsigned int size)
{
GWBUF *rval;
GWBUF *rval;
SHARED_BUF *sbuf;
// Allocate the buffer header
if ((rval = (GWBUF *)malloc(sizeof(GWBUF))) == NULL)
@ -58,14 +61,24 @@ GWBUF *rval;
return NULL;
}
// Allocate the space for the actual data
if ((rval->data = (unsigned char *)malloc(size)) == NULL)
// Allocate the shared data buffer
if ((sbuf = (SHARED_BUF *)malloc(sizeof(SHARED_BUF))) == NULL)
{
free(rval);
return NULL;
}
rval->start = rval->data;
// Allocate the space for the actual data
if ((sbuf->data = (unsigned char *)malloc(size)) == NULL)
{
free(rval);
free(sbuf);
return NULL;
}
rval->start = sbuf->data;
rval->end = rval->start + size;
sbuf->refcount = 1;
rval->sbuf = sbuf;
rval->next = NULL;
return rval;
@ -79,10 +92,42 @@ GWBUF *rval;
void
gwbuf_free(GWBUF *buf)
{
free(buf->data);
atomic_add(&buf->sbuf->refcount, -1);
if (buf->sbuf->refcount == 0)
{
free(buf->sbuf->data);
free(buf->sbuf);
}
free(buf);
}
/**
* Increment the usage count of a gateway buffer. This gets a new
* GWBUF structure that shares the actual data with the existing
* GWBUF structure but allows for the data copy to be avoided and
* also for each GWBUF to point to different portions of the same
* SHARED_BUF.
*
* @param buf The buffer to use
* @return A new GWBUF structure
*/
GWBUF *
gwbuf_clone(GWBUF *buf)
{
GWBUF *rval;
if ((rval = (GWBUF *)malloc(sizeof(GWBUF))) == NULL)
{
return NULL;
}
atomic_add(&buf->sbuf->refcount, 1);
rval->sbuf = buf->sbuf;
rval->start = buf->start;
rval->end = buf->end;
rval->next = NULL;
return rval;
}
/**
* Append a buffer onto a linked list of buffer structures.
*

View File

@ -204,14 +204,14 @@ char *status = NULL;
if ((status = (char *)malloc(200)) == NULL)
return NULL;
status[0] = 0;
if (server->status & SERVER_RUNNING)
strcat(status, "Running, ");
else
strcat(status, "Down, ");
if (server->status & SERVER_MASTER)
strcat(status, "Master");
strcat(status, "Master, ");
if (server->status & SERVER_SLAVE)
strcat(status, "Slave, ");
if (server->status & SERVER_RUNNING)
strcat(status, "Running");
else
strcat(status, "Slave");
strcat(status, "Down");
return status;
}

View File

@ -36,10 +36,21 @@
*
* Date Who Description
* 10/06/13 Mark Riddoch Initial implementation
* 11/07/13 Mark Riddoch Addition of reference count in the gwbuf
*
* @endverbatim
*/
/**
* A structure to encapsualte the data in a form that the data itself can be
* shared between multiple GWBUF's without the need to make multiple copies
* but still maintain separate data pointers.
*/
typedef struct {
unsigned char *data; /**< Physical memory that was allocated */
int refcount; /**< Reference count on the buffer */
} SHARED_BUF;
/**
* The buffer structure used by the descriptor control blocks.
*
@ -52,7 +63,7 @@ typedef struct gwbuf {
struct gwbuf *next; /**< Next buffer in a linked chain of buffers */
void *start; /**< Start of the valid data */
void *end; /**< First byte after the valid data */
unsigned char *data; /**< Physical memory that was allocated */
SHARED_BUF *sbuf; /**< The shared buffer with the real data */
} GWBUF;
/*
@ -74,10 +85,11 @@ typedef struct gwbuf {
/*
* Function prototypes for the API to maniplate the buffers
*/
extern GWBUF *gwbuf_alloc(unsigned int size);
extern void gwbuf_free(GWBUF *buf);
extern GWBUF *gwbuf_append(GWBUF *head, GWBUF *tail);
extern GWBUF *gwbuf_consume(GWBUF *head, unsigned int length);
extern GWBUF *gwbuf_alloc(unsigned int size);
extern void gwbuf_free(GWBUF *buf);
extern GWBUF *gwbuf_clone(GWBUF *buf);
extern GWBUF *gwbuf_append(GWBUF *head, GWBUF *tail);
extern GWBUF *gwbuf_consume(GWBUF *head, unsigned int length);
extern unsigned int gwbuf_length(GWBUF *head);

View File

@ -68,6 +68,7 @@ typedef struct server {
*/
#define SERVER_RUNNING 0x0001 /**<< The server is up and running */
#define SERVER_MASTER 0x0002 /**<< The server is a master, i.e. can handle writes */
#define SERVER_SLAVE 0x0004 /**<< The server is a slave, i.e. can handle reads */
/**
* Is the server running - the macro returns true if the server is marked as running
@ -84,13 +85,13 @@ typedef struct server {
* in order for the macro to return true
*/
#define SERVER_IS_MASTER(server) \
(((server)->status & (SERVER_RUNNING|SERVER_MASTER)) == (SERVER_RUNNING|SERVER_MASTER))
(((server)->status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE)) == (SERVER_RUNNING|SERVER_MASTER))
/**
* Is the server a slave? The server must be both running and marked as a slave
* in order for the macro to return true
*/
#define SERVER_IS_SLAVE(server) \
(((server)->status & (SERVER_RUNNING|SERVER_MASTER)) == SERVER_RUNNING)
(((server)->status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE)) == (SERVER_RUNNING|SERVER_SLAVE))
extern SERVER *server_alloc(char *, char *, unsigned short);

View File

@ -60,6 +60,9 @@ struct client_session {
typedef struct {
int n_sessions; /**< Number sessions created */
int n_queries; /**< Number of queries forwarded */
int n_master; /**< Number of statements sent to master */
int n_slave; /**< Number of statements sent to slave */
int n_all; /**< Number of statements sent to all */
} ROUTER_STATS;
@ -71,9 +74,7 @@ struct instance {
CLIENT_SESSION* connections; /**< Link list of all the client connections */
SPINLOCK lock; /**< Spinlock for the instance data */
BACKEND** servers; /**< The set of backend servers for this instance */
BACKEND* master; /**< NULL if not known, pointer otherwise */
unsigned int bitmask; /**< Bitmask to apply to server->status */
unsigned int bitvalue;/**< Required value of server->status */
BACKEND* master; /**< NULL if not known, pointer otherwise */
ROUTER_STATS stats; /**< Statistics for this router */
INSTANCE* next;
};

View File

@ -15,12 +15,29 @@
*
* Copyright SkySQL Ab 2013
*/
/**
* @file mysql_mon.c - A MySQL replication cluster monitor
*
* @verbatim
* Revision History
*
* Date Who Description
* 08/07/13 Mark Riddoch Initial implementation
* 11/07/13 Mark Riddoch Addition of code to check replication
* status
*
* @endverbatim
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <monitor.h>
#include <mysqlmon.h>
#include <thread.h>
#include <mysql.h>
#include <mysqld_error.h>
#include <skygw_utils.h>
#include <log_manager.h>
@ -182,6 +199,11 @@ MONITOR_SERVERS *ptr, *lptr;
static void
monitorDatabase(MONITOR_SERVERS *database)
{
MYSQL_ROW row;
MYSQL_RES *result;
int num_fields;
int ismaster = 0, isslave = 0;
if (database->con == NULL || mysql_ping(database->con) != 0)
{
database->con = mysql_init(NULL);
@ -194,8 +216,58 @@ monitorDatabase(MONITOR_SERVERS *database)
}
}
// If we get this far then we have a workign connection
/* If we get this far then we have a working connection */
server_set_status(database->server, SERVER_RUNNING);
/* Check SHOW SLAVE HOSTS - if we get rows then we are a master */
if (mysql_query(database->con, "SHOW SLAVE HOSTS"))
{
if (mysql_errno(database->con) == ER_SPECIFIC_ACCESS_DENIED_ERROR)
{
/* Log lack of permission */
}
}
else if ((result = mysql_store_result(database->con)) != NULL)
{
num_fields = mysql_num_fields(result);
while ((row = mysql_fetch_row(result)))
{
ismaster = 1;
}
mysql_free_result(result);
}
/* Check if the Slave_SQL_Running and Slave_IO_Running status is
* set to Yes
*/
if (mysql_query(database->con, "SHOW SLAVE STATUS") == 0
&& (result = mysql_store_result(database->con)) != NULL)
{
num_fields = mysql_num_fields(result);
while ((row = mysql_fetch_row(result)))
{
if (strncmp(row[10], "Yes", 3) == 0
&& strncmp(row[11], "Yes", 3) == 0)
isslave = 1;
}
mysql_free_result(result);
}
if (ismaster)
{
server_set_status(database->server, SERVER_MASTER);
server_clear_status(database->server, SERVER_SLAVE);
}
else if (isslave)
{
server_set_status(database->server, SERVER_SLAVE);
server_clear_status(database->server, SERVER_MASTER);
}
if (ismaster == 0 && isslave == 0)
{
server_clear_status(database->server, SERVER_SLAVE);
server_clear_status(database->server, SERVER_MASTER);
}
}
@ -207,29 +279,30 @@ monitorDatabase(MONITOR_SERVERS *database)
static void
monitorMain(void *arg)
{
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
MONITOR_SERVERS *ptr;
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
MONITOR_SERVERS *ptr;
if (mysql_thread_init()) {
skygw_log_write_flush(NULL,
if (mysql_thread_init())
{
skygw_log_write_flush(NULL,
LOGFILE_ERROR,
"Fatal : mysql_init_thread failed in monitor "
"module. Exiting.\n");
return ;
}
return;
}
while (1)
{
thread_millisleep(1000);
if (handle->shutdown) {
mysql_thread_end();
if (handle->shutdown)
{
mysql_thread_end();
return;
}
}
ptr = handle->databases;
while (ptr)
{
monitorDatabase(ptr);
ptr = ptr->next;
}
thread_millisleep(10000);
}
}

View File

@ -345,6 +345,7 @@ static struct {
} ServerBits[] = {
{ "running", SERVER_RUNNING },
{ "master", SERVER_MASTER },
{ "slave", SERVER_SLAVE },
{ NULL, 0 }
};
/**

View File

@ -192,13 +192,13 @@ int i, n;
{
if (!strcasecmp(options[i], "master"))
{
inst->bitmask |= SERVER_MASTER;
inst->bitmask |= (SERVER_MASTER|SERVER_SLAVE);
inst->bitvalue |= SERVER_MASTER;
}
else if (!strcasecmp(options[i], "slave"))
{
inst->bitmask |= SERVER_MASTER;
inst->bitvalue &= ~SERVER_MASTER;
inst->bitmask |= (SERVER_MASTER|SERVER_SLAVE);
inst->bitvalue |= SERVER_SLAVE;
}
}
}

View File

@ -55,38 +55,6 @@ static ROUTER_OBJECT MyObject =
static SPINLOCK instlock;
static INSTANCE* instances;
#if defined(SS_DEBUG)
static void vilhos_test_for_query_classifier(void)
{
MYSQL* mysql = NULL;
ss_dassert(mysql_thread_safe());
mysql_thread_init();
char* str = (char *)calloc(1,
sizeof("Query type is ")+
sizeof("QUERY_TYPE_SESSION_WRITE"));
/**
* Call query classifier.
*/
sprintf(str,
"Query type is %s\n",
STRQTYPE(
skygw_query_classifier_get_type(
"SELECT user from mysql.user", 0)));
/**
* generate some log
*/
skygw_log_write(NULL, LOGFILE_MESSAGE,str);
mysql_close(mysql);
mysql_thread_end();
ss_dfprintf(stderr, "\n<< testmain\n");
fflush(stderr);
}
#endif /* SS_DEBUG */
/**
* Implementation of the mandatory version entry point
*
@ -110,9 +78,6 @@ ModuleInit()
"Initialize read/write split router module.\n");
spinlock_init(&instlock);
instances = NULL;
#if defined(NOMORE)
vilhos_test_for_query_classifier();
#endif
}
/**
@ -193,25 +158,6 @@ static ROUTER* createInstance(
}
inst->servers[n] = NULL;
/*
* Process the options
*/
inst->bitmask = 0;
inst->bitvalue = 0;
if (options) {
for (i = 0; options[i]; i++) {
if (!strcasecmp(options[i], "master")) {
inst->bitmask |= SERVER_MASTER;
inst->bitvalue |= SERVER_MASTER;
ss_dassert(inst->master == NULL);
inst->master = inst->servers[i];
} else if (!strcasecmp(options[i], "slave")) {
inst->bitmask |= SERVER_MASTER;
inst->bitvalue &= ~SERVER_MASTER;
}
} /* for */
}
/**
* We have completed the creation of the instance data, so now
* insert this router instance into the linked list of routers
@ -244,25 +190,26 @@ static void* newSession(
INSTANCE* inst = (INSTANCE *)instance;
int i;
if ((client = (CLIENT_SESSION *)malloc(sizeof(CLIENT_SESSION))) == NULL) {
if ((client = (CLIENT_SESSION *)malloc(sizeof(CLIENT_SESSION))) == NULL)
{
return NULL;
}
/**
* Find a backend server to connect to. This is the extent of the
* load balancing algorithm we need to implement for this simple
* connection router.
*/
/** First find a running server to set as our initial candidate server */
for (i = 0; inst->servers[i]; i++) {
for (i = 0; inst->servers[i]; i++)
{
if (inst->servers[i] && SERVER_IS_RUNNING(inst->servers[i]->server) &&
(inst->servers[i]->server->status & inst->bitmask) == inst->bitvalue)
if (inst->servers[i] && SERVER_IS_SLAVE(inst->servers[i]->server))
{
candidate = inst->servers[i];
break;
}
}
/**
* Loop over all the servers and find any that have fewer connections than our
* candidate server.
@ -275,13 +222,12 @@ static void* newSession(
* become the new candidate. This has the effect of spreading the connections
* over different servers during periods of very low load.
*/
for (i = 1; inst->servers[i]; i++) {
for (i = 0; inst->servers[i]; i++) {
if (inst->servers[i]
&& SERVER_IS_RUNNING(inst->servers[i]->server))
{
if ((inst->servers[i]->server->status & inst->bitmask)
== inst->bitvalue)
if (SERVER_IS_SLAVE(inst->servers[i]->server))
{
if (inst->servers[i]->count < candidate->count) {
candidate = inst->servers[i];
@ -314,29 +260,27 @@ static void* newSession(
/**
* Open the slave connection.
*/
if ((client->slaveconn = dcb_connect(candidate->server,
session,
candidate->server->protocol)) == NULL)
{
atomic_add(&candidate->count, -1);
free(client);
return NULL;
}
/**
* Open the master connection.
*/
if ((client->masterconn =
dcb_connect(client->master->server,
session,
client->master->server->protocol)) == NULL)
{
atomic_add(&client->master->count, -1);
free(client);
return NULL;
}
inst->stats.n_sessions += 1;
/* Add this session to end of the list of active sessions */
spinlock_acquire(&inst->lock);
if ((client->slaveconn = dcb_connect(candidate->server, session,
candidate->server->protocol)) == NULL)
{
atomic_add(&candidate->count, -1);
free(client);
return NULL;
}
/**
* Open the master connection.
*/
if ((client->masterconn = dcb_connect(client->master->server, session,
client->master->server->protocol)) == NULL)
{
atomic_add(&client->master->count, -1);
free(client);
return NULL;
}
inst->stats.n_sessions += 1;
/* Add this session to end of the list of active sessions */
spinlock_acquire(&inst->lock);
client->next = inst->connections;
inst->connections = client;
spinlock_release(&inst->lock);
@ -419,23 +363,22 @@ static int routeQuery(
char* querystr = NULL;
char* startpos;
size_t len;
unsigned char packet_type;
unsigned char packet_type, *packet;
int ret = 0;
INSTANCE* inst = (INSTANCE *)instance;
CLIENT_SESSION* session = (CLIENT_SESSION *)router_session;
inst->stats.n_queries++;
packet_type = (unsigned char)queue->data[4];
startpos = (char *)&queue->data[5];
len = (unsigned char)queue->data[0];
len += 255*(unsigned char)queue->data[1];
len += 255*255*((unsigned char)queue->data[2]);
packet = GWBUF_DATA(queue);
packet_type = packet[4];
startpos = (char *)&packet[5];
len = packet[0];
len += 255*packet[1];
len += 255*255*packet[2];
switch(packet_type) {
case COM_INIT_DB: /**< 2 */
case COM_CREATE_DB: /**< 5 */
case COM_DROP_DB: /**< 6 */
case COM_INIT_DB: /**< 2 DDL must go to the master */
case COM_REFRESH: /**< 7 - I guess this is session but not sure */
case COM_DEBUG: /**< 0d all servers dump debug info to stdout */
case COM_PING: /**< 0e all servers are pinged */
@ -443,6 +386,11 @@ static int routeQuery(
qtype = QUERY_TYPE_SESSION_WRITE;
break;
case COM_CREATE_DB: /**< 5 DDL must go to the master */
case COM_DROP_DB: /**< 6 DDL must go to the master */
qtype = QUERY_TYPE_WRITE;
break;
case COM_QUERY:
querystr = (char *)malloc(len);
memcpy(querystr, startpos, len-1);
@ -451,7 +399,7 @@ static int routeQuery(
break;
default:
case COM_SHUTDOWN: /**< 8 where shutdown soulh be routed ? */
case COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
case COM_STATISTICS: /**< 9 ? */
case COM_PROCESS_INFO: /**< 0a ? */
case COM_CONNECT: /**< 0b ? */
@ -474,6 +422,7 @@ static int routeQuery(
"Query type\t%s, routing to Master.",
STRQTYPE(qtype));
ret = session->masterconn->func.write(session->masterconn, queue);
atomic_add(&inst->stats.n_master, 1);
goto return_ret;
break;
@ -483,6 +432,7 @@ static int routeQuery(
"Query type\t%s, routing to Slave.",
STRQTYPE(qtype));
ret = session->slaveconn->func.write(session->slaveconn, queue);
atomic_add(&inst->stats.n_slave, 1);
goto return_ret;
break;
@ -490,23 +440,29 @@ static int routeQuery(
case QUERY_TYPE_SESSION_WRITE:
skygw_log_write(NULL,
LOGFILE_TRACE,
"Query type\t%s, routing to Master.",
"Query type\t%s, routing to All servers.",
STRQTYPE(qtype));
/**
* TODO! Connection to all servers must be established, and
* the command must be executed in them.
*/
ret = session->masterconn->func.write(session->masterconn, queue);
{
GWBUF *cq = gwbuf_clone(queue);
ret = session->masterconn->func.write(session->masterconn, queue);
session->slaveconn->func.write(session->masterconn, cq);
}
atomic_add(&inst->stats.n_all, 1);
goto return_ret;
break;
default:
skygw_log_write(NULL,
LOGFILE_TRACE,
"Query type\t%s, routing to Master.",
"Query type\t%s, routing to Master by default.",
STRQTYPE(qtype));
/** Is this really ok? */
ret = session->masterconn->func.write(session->masterconn, queue);
atomic_add(&inst->stats.n_master, 1);
goto return_ret;
break;
}
@ -527,4 +483,23 @@ return_ret:
static void
diagnostic(ROUTER *instance, DCB *dcb)
{
CLIENT_SESSION *session;
INSTANCE *inst = (INSTANCE *)instance;
int i = 0;
spinlock_acquire(&inst->lock);
session = inst->connections;
while (session)
{
i++;
session = session->next;
}
spinlock_release(&inst->lock);
dcb_printf(dcb, "\tNumber of router sessions: %d\n", inst->stats.n_sessions);
dcb_printf(dcb, "\tCurrent no. of router sessions: %d\n", i);
dcb_printf(dcb, "\tNumber of queries forwarded: %d\n", inst->stats.n_queries);
dcb_printf(dcb, "\tNumber of queries forwarded to master: %d\n", inst->stats.n_master);
dcb_printf(dcb, "\tNumber of queries forwarded to slave: %d\n", inst->stats.n_slave);
dcb_printf(dcb, "\tNumber of queries forwarded to all: %d\n", inst->stats.n_all);
}