Initial implementation of session based sharding.

This commit is contained in:
Markus Makela
2015-01-14 10:34:51 +02:00
parent 0187edf559
commit c1ed24aa78
4 changed files with 108 additions and 11 deletions

View File

@ -112,6 +112,7 @@ bool query_is_parsed(GWBUF* buf);
bool skygw_query_has_clause(GWBUF* buf); bool skygw_query_has_clause(GWBUF* buf);
char* skygw_get_qtype_str(skygw_query_type_t qtype); char* skygw_get_qtype_str(skygw_query_type_t qtype);
char* skygw_get_affected_fields(GWBUF* buf); char* skygw_get_affected_fields(GWBUF* buf);
char** skygw_get_database_names(GWBUF* querybuf,int* size);
EXTERN_C_BLOCK_END EXTERN_C_BLOCK_END

View File

@ -283,7 +283,9 @@ orphan_free(void* data)
while(finished) while(finished)
{ {
#ifdef SS_DEBUG
o_freed++; o_freed++;
#endif
tmp = finished; tmp = finished;
finished = finished->next; finished = finished->next;

View File

@ -55,7 +55,8 @@ typedef enum bref_state {
BREF_IN_USE = 0x01, BREF_IN_USE = 0x01,
BREF_WAITING_RESULT = 0x02, /*< for session commands only */ BREF_WAITING_RESULT = 0x02, /*< for session commands only */
BREF_QUERY_ACTIVE = 0x04, /*< for other queries */ BREF_QUERY_ACTIVE = 0x04, /*< for other queries */
BREF_CLOSED = 0x08 BREF_CLOSED = 0x08,
BREF_DB_MAPPED = 0x10
} bref_state_t; } bref_state_t;
#define BREF_IS_NOT_USED(s) ((s)->bref_state & ~BREF_IN_USE) #define BREF_IS_NOT_USED(s) ((s)->bref_state & ~BREF_IN_USE)
@ -63,7 +64,7 @@ typedef enum bref_state {
#define BREF_IS_WAITING_RESULT(s) ((s)->bref_num_result_wait > 0) #define BREF_IS_WAITING_RESULT(s) ((s)->bref_num_result_wait > 0)
#define BREF_IS_QUERY_ACTIVE(s) ((s)->bref_state & BREF_QUERY_ACTIVE) #define BREF_IS_QUERY_ACTIVE(s) ((s)->bref_state & BREF_QUERY_ACTIVE)
#define BREF_IS_CLOSED(s) ((s)->bref_state & BREF_CLOSED) #define BREF_IS_CLOSED(s) ((s)->bref_state & BREF_CLOSED)
#define BREF_IS_MAPPED(s) ((s)->bref_mapped)
typedef enum backend_type_t { typedef enum backend_type_t {
BE_UNDEFINED=-1, BE_UNDEFINED=-1,
BE_MASTER, BE_MASTER,
@ -206,6 +207,7 @@ typedef struct backend_ref_st {
BACKEND* bref_backend; BACKEND* bref_backend;
DCB* bref_dcb; DCB* bref_dcb;
bref_state_t bref_state; bref_state_t bref_state;
bool bref_mapped;
int bref_num_result_wait; int bref_num_result_wait;
sescmd_cursor_t bref_sescmd_cur; sescmd_cursor_t bref_sescmd_cur;
GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */ GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */
@ -267,6 +269,9 @@ struct router_client_session {
#endif #endif
struct router_instance *router; /*< The router instance */ struct router_instance *router; /*< The router instance */
struct router_client_session* next; struct router_client_session* next;
HASHTABLE* dbhash;
bool hash_init;
GWBUF* queue;
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail; skygw_chk_t rses_chk_tail;
#endif #endif

View File

@ -43,6 +43,7 @@ MODULE_INFO info = {
}; };
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
# include <mysql_client_server_protocol.h> # include <mysql_client_server_protocol.h>
#endif #endif
/** Defined in log_manager.cc */ /** Defined in log_manager.cc */
@ -154,12 +155,6 @@ static bool get_shard_dcb(
bool is_ignored_database(ROUTER_INSTANCE* inst, char* str); bool is_ignored_database(ROUTER_INSTANCE* inst, char* str);
#if 0
static void rwsplit_process_router_options(
ROUTER_INSTANCE* router,
char** options);
#endif
static ROUTER_OBJECT MyObject = { static ROUTER_OBJECT MyObject = {
createInstance, createInstance,
newSession, newSession,
@ -316,6 +311,49 @@ static void* hfree(void* fval)
return NULL; return NULL;
} }
bool parse_showdb_response(ROUTER_CLIENT_SES* rses, GWBUF* buf)
{
int rval = 0;
rval = modutil_count_signal_packets(rses,0,0);
return rval > 1;
}
int gen_tablelist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
{
DCB* dcb;
const char* query = "SHOW DATABASES;";
GWBUF *buffer,*clone;
backend_ref_t* backends;
int i,rval;
unsigned int len;
session->hash_init = false;
len = strlen(query);
backends = session->rses_backend_ref;
buffer = gwbuf_alloc(len + 4);
*((unsigned char*)buffer->start) = len;
*((unsigned char*)buffer->start + 1) = len>>8;
*((unsigned char*)buffer->start + 2) = len>>16;
*((unsigned char*)buffer->start + 3) = 0x1;
*((unsigned char*)buffer->start + 4) = 0x03;
memcpy(buffer->start + 5,query,strlen(query));
for(i = 0;i<session->rses_nbackends;i++)
{
clone = gwbuf_clone(buffer);
dcb = backends[i].bref_dcb;
if(BREF_IS_IN_USE(&backends[i]))
{
rval = dcb->func.write(dcb,clone);
}
}
return !rval;
}
/** /**
* Updates the hashtable with the database names and where to find them, adding * Updates the hashtable with the database names and where to find them, adding
@ -333,9 +371,10 @@ bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* ha
bool rval = true; bool rval = true;
SERVER* server; SERVER* server;
int i, rc, numfields; int i, rc, numfields;
for(i = 0;backends[i];i++) for(i = 0;backends[i];i++)
{ {
MYSQL* handle = mysql_init(NULL); MYSQL* handle = mysql_init(NULL);
MYSQL_RES* result = NULL; MYSQL_RES* result = NULL;
MYSQL_ROW row; MYSQL_ROW row;
@ -1911,6 +1950,13 @@ static int routeQuery(
} }
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf)); ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
if(router_cli_ses->dbhash == NULL && !router_cli_ses->hash_init)
{
router_cli_ses->queue = querybuf;
router_cli_ses->dbhash = hashtable_alloc(7, hashkeyfun, hashcmpfun);
gen_tablelist(inst,router_cli_ses);
return 1;
}
packet = GWBUF_DATA(querybuf); packet = GWBUF_DATA(querybuf);
packet_type = packet[4]; packet_type = packet[4];
@ -2635,8 +2681,51 @@ static void clientReply (
/** Set response status as replied */ /** Set response status as replied */
bref_clear_state(bref, BREF_WAITING_RESULT); bref_clear_state(bref, BREF_WAITING_RESULT);
} }
if (writebuf != NULL && client_dcb != NULL) if(!router_cli_ses->hash_init)
{
bool mapped = true;
int i;
backend_ref_t* bkrf = router_cli_ses->rses_backend_ref;
for(i = 0; i < router_cli_ses->rses_nbackends; i++)
{
if(bref->bref_dcb == bkrf[i].bref_dcb)
{
router_cli_ses->rses_backend_ref[i].bref_mapped = true;
parse_showdb_response(router_cli_ses,writebuf);
skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] server '%s' databases mapped.",
router_cli_ses,
bref->bref_backend->backend_server->unique_name);
}
if(BREF_IS_IN_USE(&bkrf[i]) &&
!BREF_IS_MAPPED(&bkrf[i]))
{
mapped = false;
skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] server '%s' databases not yet mapped.",
router_cli_ses,
bkrf[i].bref_backend->backend_server->unique_name);
//break;
}
}
gwbuf_free(writebuf);
rses_end_locked_router_action(router_cli_ses);
if(mapped)
{
router_cli_ses->hash_init = true;
routeQuery(instance,router_session,router_cli_ses->queue);
router_cli_ses->queue = NULL;
skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] database map finished.",
router_cli_ses);
}
return;
}
else if (writebuf != NULL && client_dcb != NULL)
{ {
/** Write reply to client DCB */ /** Write reply to client DCB */
SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);