Final implementation of client session based sharding.
This commit is contained in:
@ -528,7 +528,7 @@ modutil_count_signal_packets(GWBUF *reply, int use_ok, int n_found)
|
|||||||
unsigned char* ptr = (unsigned char*) reply->start;
|
unsigned char* ptr = (unsigned char*) reply->start;
|
||||||
unsigned char* end = (unsigned char*) reply->end;
|
unsigned char* end = (unsigned char*) reply->end;
|
||||||
unsigned char* prev = ptr;
|
unsigned char* prev = ptr;
|
||||||
int pktlen, eof = 0, err = 0, found = n_found;
|
int pktlen, eof = 0, err = 0;
|
||||||
int errlen = 0, eoflen = 0;
|
int errlen = 0, eoflen = 0;
|
||||||
int iserr = 0, iseof = 0;
|
int iserr = 0, iseof = 0;
|
||||||
while(ptr < end)
|
while(ptr < end)
|
||||||
@ -583,3 +583,180 @@ modutil_count_signal_packets(GWBUF *reply, int use_ok, int n_found)
|
|||||||
|
|
||||||
return(eof + err);
|
return(eof + err);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void resultset_free(RESULTSET* rset)
|
||||||
|
{
|
||||||
|
RSET_ROW *row,*tmp;
|
||||||
|
int i;
|
||||||
|
|
||||||
|
row = rset->head;
|
||||||
|
|
||||||
|
while(row)
|
||||||
|
{
|
||||||
|
tmp = row;
|
||||||
|
row = row->next;
|
||||||
|
for(i = 0;i<rset->columns;i++)
|
||||||
|
{
|
||||||
|
free(tmp->data[i]);
|
||||||
|
}
|
||||||
|
free(tmp);
|
||||||
|
}
|
||||||
|
free(rset);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
char* get_lenenc_str(void* data, int* len)
|
||||||
|
{
|
||||||
|
unsigned char* ptr = data;
|
||||||
|
char* rval;
|
||||||
|
int size, offset;
|
||||||
|
|
||||||
|
if(data == NULL || len == NULL)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(*ptr < 251)
|
||||||
|
{
|
||||||
|
size = *ptr;
|
||||||
|
offset = 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
switch(*(ptr))
|
||||||
|
{
|
||||||
|
case 0xfb:
|
||||||
|
*len = 1;
|
||||||
|
return NULL;
|
||||||
|
case 0xfc:
|
||||||
|
size = *(ptr + 1) + (*(ptr + 2) << 8);
|
||||||
|
offset = 2;
|
||||||
|
break;
|
||||||
|
case 0xfd:
|
||||||
|
size = *ptr + (*(ptr + 2) << 8) + (*(ptr + 3) << 16);
|
||||||
|
offset = 3;
|
||||||
|
break;
|
||||||
|
case 0xfe:
|
||||||
|
size = *ptr + ((*(ptr + 2) << 8)) + (*(ptr + 3) << 16) +
|
||||||
|
(*(ptr + 4) << 24) + (*(ptr + 5) << 32) + (*(ptr + 6) << 40) +
|
||||||
|
(*(ptr + 7) << 48) + (*(ptr + 8) << 56);
|
||||||
|
offset = 8;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rval = malloc(sizeof(char)*(size+1));
|
||||||
|
if(rval)
|
||||||
|
{
|
||||||
|
memcpy(rval,ptr + offset,size);
|
||||||
|
memset(rval + size,0,1);
|
||||||
|
|
||||||
|
}
|
||||||
|
*len = size + offset;
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
RESULTSET*
|
||||||
|
modutil_get_rows(GWBUF* buffer)
|
||||||
|
{
|
||||||
|
RESULTSET* result;
|
||||||
|
RSET_ROW* row;
|
||||||
|
int columns,plen,slen,i,offset;
|
||||||
|
unsigned char *ptr,*end;
|
||||||
|
|
||||||
|
if(buffer == NULL)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
ptr = (unsigned char*) buffer->start;
|
||||||
|
end = (unsigned char*) buffer->end;
|
||||||
|
|
||||||
|
if(!PTR_IS_RESULTSET(ptr))
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
columns = *(ptr + 4);
|
||||||
|
|
||||||
|
if((result = calloc(1, sizeof(RESULTSET))) == NULL)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
result->columns = columns;
|
||||||
|
|
||||||
|
while(ptr < end)
|
||||||
|
{
|
||||||
|
plen = MYSQL_GET_PACKET_LEN(ptr) + 4;
|
||||||
|
if(PTR_IS_EOF(ptr))
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
ptr += plen;
|
||||||
|
}
|
||||||
|
|
||||||
|
ptr += plen;
|
||||||
|
|
||||||
|
while(ptr < end)
|
||||||
|
{
|
||||||
|
|
||||||
|
plen = MYSQL_GET_PACKET_LEN(ptr) + 4;
|
||||||
|
|
||||||
|
if(PTR_IS_EOF(ptr) || PTR_IS_ERR(ptr))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The final EOF/ERR packet was found so the result set is parsed.
|
||||||
|
*/
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(ptr + plen > end)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* There is a partial packet in the buffer and this is not a complete
|
||||||
|
* result set. This is considered as an error and will cause the
|
||||||
|
* deallocation of the resultset.
|
||||||
|
*/
|
||||||
|
|
||||||
|
resultset_free(result);
|
||||||
|
result = NULL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if((row = calloc(1,sizeof(RSET_ROW))) == NULL)
|
||||||
|
{
|
||||||
|
resultset_free(result);
|
||||||
|
result = NULL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if((row->data = malloc(sizeof(char*)*columns)) == NULL)
|
||||||
|
{
|
||||||
|
free(row);
|
||||||
|
resultset_free(result);
|
||||||
|
result = NULL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
offset = 4;
|
||||||
|
|
||||||
|
for(i = 0;i<columns;i++)
|
||||||
|
{
|
||||||
|
row->data[i] = get_lenenc_str(ptr + offset,&slen);
|
||||||
|
offset += slen;
|
||||||
|
}
|
||||||
|
|
||||||
|
row->next = result->head;
|
||||||
|
result->rows++;
|
||||||
|
result->head = row;
|
||||||
|
|
||||||
|
ptr += plen;
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
@ -39,6 +39,18 @@
|
|||||||
#define PTR_IS_OK(b) (b[4] == 0x00)
|
#define PTR_IS_OK(b) (b[4] == 0x00)
|
||||||
#define PTR_IS_ERR(b) (b[4] == 0xff)
|
#define PTR_IS_ERR(b) (b[4] == 0xff)
|
||||||
#define PTR_IS_LOCAL_INFILE(b) (b[4] == 0xfb)
|
#define PTR_IS_LOCAL_INFILE(b) (b[4] == 0xfb)
|
||||||
|
#define IS_FULL_RESPONSE(buf) (modutil_count_signal_packets(buf,0,0) == 2)
|
||||||
|
|
||||||
|
typedef struct rset_row_t{
|
||||||
|
char** data;
|
||||||
|
struct rset_row_t* next;
|
||||||
|
} RSET_ROW;
|
||||||
|
|
||||||
|
typedef struct rset_t{
|
||||||
|
int rows;
|
||||||
|
int columns;
|
||||||
|
RSET_ROW* head;
|
||||||
|
}RESULTSET;
|
||||||
|
|
||||||
extern int modutil_is_SQL(GWBUF *);
|
extern int modutil_is_SQL(GWBUF *);
|
||||||
extern int modutil_extract_SQL(GWBUF *, char **, int *);
|
extern int modutil_extract_SQL(GWBUF *, char **, int *);
|
||||||
@ -59,4 +71,6 @@ GWBUF *modutil_create_mysql_err_msg(
|
|||||||
const char *msg);
|
const char *msg);
|
||||||
|
|
||||||
int modutil_count_signal_packets(GWBUF*,int,int);
|
int modutil_count_signal_packets(GWBUF*,int,int);
|
||||||
|
void resultset_free(RESULTSET* rset);
|
||||||
|
RESULTSET* modutil_get_rows(GWBUF*);
|
||||||
#endif
|
#endif
|
||||||
|
@ -72,7 +72,7 @@ extern __thread log_info_t tls_log_info;
|
|||||||
* @endverbatim
|
* @endverbatim
|
||||||
*/
|
*/
|
||||||
|
|
||||||
static char *version_str = "V1.0.2";
|
static char *version_str = "V1.0.0";
|
||||||
|
|
||||||
static ROUTER* createInstance(SERVICE *service, char **options);
|
static ROUTER* createInstance(SERVICE *service, char **options);
|
||||||
static void* newSession(ROUTER *instance, SESSION *session);
|
static void* newSession(ROUTER *instance, SESSION *session);
|
||||||
@ -311,13 +311,31 @@ static void* hfree(void* fval)
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool parse_showdb_response(ROUTER_CLIENT_SES* rses, GWBUF* buf)
|
bool parse_showdb_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf)
|
||||||
{
|
{
|
||||||
int rval = 0;
|
int rval = 0;
|
||||||
|
RESULTSET* rset;
|
||||||
rval = modutil_count_signal_packets(rses,0,0);
|
RSET_ROW* row;
|
||||||
|
|
||||||
return rval > 1;
|
if(PTR_IS_RESULTSET(((unsigned char*)buf->start)) &&
|
||||||
|
modutil_count_signal_packets(buf,0,0) == 2)
|
||||||
|
{
|
||||||
|
rset = modutil_get_rows(buf);
|
||||||
|
if(rset && rset->columns == 1)
|
||||||
|
{
|
||||||
|
row = rset->head;
|
||||||
|
|
||||||
|
while(row)
|
||||||
|
{
|
||||||
|
hashtable_add(rses->dbhash,row->data[0],target);
|
||||||
|
row = row->next;
|
||||||
|
}
|
||||||
|
resultset_free(rset);
|
||||||
|
rval = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
int gen_tablelist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
|
int gen_tablelist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
|
||||||
@ -337,7 +355,7 @@ int gen_tablelist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
|
|||||||
*((unsigned char*)buffer->start) = len;
|
*((unsigned char*)buffer->start) = len;
|
||||||
*((unsigned char*)buffer->start + 1) = len>>8;
|
*((unsigned char*)buffer->start + 1) = len>>8;
|
||||||
*((unsigned char*)buffer->start + 2) = len>>16;
|
*((unsigned char*)buffer->start + 2) = len>>16;
|
||||||
*((unsigned char*)buffer->start + 3) = 0x1;
|
*((unsigned char*)buffer->start + 3) = 0x0;
|
||||||
*((unsigned char*)buffer->start + 4) = 0x03;
|
*((unsigned char*)buffer->start + 4) = 0x03;
|
||||||
memcpy(buffer->start + 5,query,strlen(query));
|
memcpy(buffer->start + 5,query,strlen(query));
|
||||||
|
|
||||||
@ -639,7 +657,7 @@ void* dbnames_hash_init(ROUTER_INSTANCE* inst,BACKEND** backends)
|
|||||||
* @return Name of the backend or NULL if the query contains no known databases.
|
* @return Name of the backend or NULL if the query contains no known databases.
|
||||||
*/
|
*/
|
||||||
char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF* buffer,skygw_query_type_t qtype){
|
char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF* buffer,skygw_query_type_t qtype){
|
||||||
HASHTABLE* ht = router->dbnames_hash;
|
HASHTABLE* ht = client->dbhash;
|
||||||
int sz = 0,i,j;
|
int sz = 0,i,j;
|
||||||
char** dbnms = NULL;
|
char** dbnms = NULL;
|
||||||
char* rval = NULL;
|
char* rval = NULL;
|
||||||
@ -697,10 +715,10 @@ bool check_shard_status(ROUTER_INSTANCE* router, char* shard)
|
|||||||
{
|
{
|
||||||
rval = true;
|
rval = true;
|
||||||
}
|
}
|
||||||
else
|
/*else
|
||||||
{
|
{
|
||||||
update_dbnames_hash(router,router->servers,router->dbnames_hash);
|
update_dbnames_hash(router,router->servers,router->dbnames_hash);
|
||||||
}
|
}*/
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1759,7 +1777,7 @@ void check_create_tmp_table(
|
|||||||
GWBUF* gen_show_dbs_response(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client)
|
GWBUF* gen_show_dbs_response(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client)
|
||||||
{
|
{
|
||||||
GWBUF* rval = NULL;
|
GWBUF* rval = NULL;
|
||||||
HASHTABLE* ht = router->dbnames_hash;
|
HASHTABLE* ht = client->dbhash;
|
||||||
HASHITERATOR* iter = hashtable_iterator(ht);
|
HASHITERATOR* iter = hashtable_iterator(ht);
|
||||||
BACKEND** backends = router->servers;
|
BACKEND** backends = router->servers;
|
||||||
unsigned int coldef_len = 0;
|
unsigned int coldef_len = 0;
|
||||||
@ -1954,6 +1972,10 @@ static int routeQuery(
|
|||||||
{
|
{
|
||||||
router_cli_ses->queue = querybuf;
|
router_cli_ses->queue = querybuf;
|
||||||
router_cli_ses->dbhash = hashtable_alloc(7, hashkeyfun, hashcmpfun);
|
router_cli_ses->dbhash = hashtable_alloc(7, hashkeyfun, hashcmpfun);
|
||||||
|
hashtable_memory_fns(router_cli_ses->dbhash,(HASHMEMORYFN)strdup,
|
||||||
|
(HASHMEMORYFN)strdup,
|
||||||
|
(HASHMEMORYFN)free,
|
||||||
|
(HASHMEMORYFN)free);
|
||||||
gen_tablelist(inst,router_cli_ses);
|
gen_tablelist(inst,router_cli_ses);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
@ -2171,7 +2193,7 @@ static int routeQuery(
|
|||||||
unsigned int plen = gw_mysql_get_byte3((unsigned char*)querybuf->start) - 1;
|
unsigned int plen = gw_mysql_get_byte3((unsigned char*)querybuf->start) - 1;
|
||||||
memcpy(dbname,querybuf->start + 5,plen);
|
memcpy(dbname,querybuf->start + 5,plen);
|
||||||
dbname[plen] = '\0';
|
dbname[plen] = '\0';
|
||||||
tname = hashtable_fetch(inst->dbnames_hash,dbname);
|
tname = hashtable_fetch(router_cli_ses->dbhash,dbname);
|
||||||
if(tname)
|
if(tname)
|
||||||
{
|
{
|
||||||
route_target = TARGET_NAMED_SERVER;
|
route_target = TARGET_NAMED_SERVER;
|
||||||
@ -2195,7 +2217,7 @@ static int routeQuery(
|
|||||||
* the target is undefined and an error will be returned to the client.
|
* the target is undefined and an error will be returned to the client.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
update_dbnames_hash(inst,inst->servers,inst->dbnames_hash);
|
//update_dbnames_hash(inst,inst->servers,inst->dbnames_hash);
|
||||||
|
|
||||||
if((tname = get_shard_target_name(inst,router_cli_ses,querybuf,qtype)) != NULL &&
|
if((tname = get_shard_target_name(inst,router_cli_ses,querybuf,qtype)) != NULL &&
|
||||||
check_shard_status(inst,tname))
|
check_shard_status(inst,tname))
|
||||||
@ -2211,7 +2233,7 @@ static int routeQuery(
|
|||||||
* No valid targets found for this query, return an error packet and update the hashtable. This also adds new databases to the hashtable.
|
* No valid targets found for this query, return an error packet and update the hashtable. This also adds new databases to the hashtable.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
update_dbnames_hash(inst,inst->servers,inst->dbnames_hash);
|
//update_dbnames_hash(inst,inst->servers,inst->dbnames_hash);
|
||||||
tname = get_shard_target_name(inst,router_cli_ses,querybuf,qtype);
|
tname = get_shard_target_name(inst,router_cli_ses,querybuf,qtype);
|
||||||
|
|
||||||
if( (tname == NULL &&
|
if( (tname == NULL &&
|
||||||
@ -2694,7 +2716,9 @@ static void clientReply (
|
|||||||
if(bref->bref_dcb == bkrf[i].bref_dcb)
|
if(bref->bref_dcb == bkrf[i].bref_dcb)
|
||||||
{
|
{
|
||||||
router_cli_ses->rses_backend_ref[i].bref_mapped = true;
|
router_cli_ses->rses_backend_ref[i].bref_mapped = true;
|
||||||
parse_showdb_response(router_cli_ses,writebuf);
|
parse_showdb_response(router_cli_ses,
|
||||||
|
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
|
||||||
|
writebuf);
|
||||||
skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] server '%s' databases mapped.",
|
skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] server '%s' databases mapped.",
|
||||||
router_cli_ses,
|
router_cli_ses,
|
||||||
bref->bref_backend->backend_server->unique_name);
|
bref->bref_backend->backend_server->unique_name);
|
||||||
|
Reference in New Issue
Block a user