Moved common sharding functions to a separate file.

This commit is contained in:
Markus Makela
2015-05-04 07:04:31 +03:00
parent c8e5df94be
commit 463cd4a97e
7 changed files with 320 additions and 398 deletions

View File

@ -23,6 +23,7 @@
#include <stdint.h>
#include <router.h>
#include <schemarouter.h>
#include <sharding_common.h>
#include <secrets.h>
#include <mysql.h>
#include <skygw_utils.h>
@ -208,12 +209,6 @@ static ROUTER_INSTANCE* instances;
static int hashkeyfun(void* key);
static int hashcmpfun (void *, void *);
static bool change_current_db(
ROUTER_INSTANCE* inst,
ROUTER_CLIENT_SES* rses,
GWBUF* buf);
static int hashkeyfun(void* key)
{
if(key == NULL){
@ -250,7 +245,7 @@ char* get_lenenc_str(void* data, int* len)
{
unsigned char* ptr = (unsigned char*)data;
char* rval;
long size, offset;
unsigned long size, offset;
if(data == NULL || len == NULL)
{
@ -279,9 +274,14 @@ char* get_lenenc_str(void* data, int* len)
offset = 3;
break;
case 0xfe:
size = *ptr + ((*(ptr + 2) << 8)) + (*(ptr + 3) << 16) +
(*(ptr + 4) << 24) + (*(ptr + 5) << 32) + (*(ptr + 6) << 40) +
(*(ptr + 7) << 48) + (*(ptr + 8) << 56);
size = *ptr +
((*(ptr + 2) << 8)) +
(*(ptr + 3) << 16) +
(*(ptr + 4) << 24) +
((unsigned long)*(ptr + 5) << 32) +
((unsigned long)*(ptr + 6) << 40) +
((unsigned long)*(ptr + 7) << 48) +
((unsigned long)*(ptr + 8) << 56);
offset = 8;
break;
default:
@ -1837,7 +1837,7 @@ static int routeQuery(
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
mysql_server_cmd_t packet_type;
uint8_t* packet;
int ret = 0;
int i,ret = 0;
DCB* target_dcb = NULL;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
@ -1847,7 +1847,8 @@ static int routeQuery(
bool succp = false;
char* tname = NULL;
GWBUF* querybuf = qbuf;
char db[MYSQL_DATABASE_MAXLEN + 1];
char errbuf[26+MYSQL_DATABASE_MAXLEN];
CHK_CLIENT_RSES(router_cli_ses);
/** Dirty read for quick check if router is closed. */
@ -2020,12 +2021,25 @@ static int routeQuery(
if (packet_type == MYSQL_COM_INIT_DB ||
op == QUERY_OP_CHANGE_DB)
{
if (!(change_successful = change_current_db(inst, router_cli_ses, querybuf)))
if (!(change_successful = change_current_db(router_cli_ses->rses_mysql_session,
router_cli_ses->dbhash,
querybuf)))
{
extract_database(querybuf,db);
snprintf(errbuf,25+MYSQL_DATABASE_MAXLEN,"Unknown database: %s",db);
for(i = 0;i<router_cli_ses->rses_nbackends;i++)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Changing database failed.")));
if(SERVER_IS_RUNNING(router_cli_ses->rses_backend_ref[i].bref_backend->backend_server))
{
create_error_reply(errbuf,router_cli_ses->rses_backend_ref[i].bref_dcb);
break;
}
}
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Changing database failed.")));
}
}
if(QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_DATABASES))
@ -2755,8 +2769,7 @@ static void clientReply (
/** There is one pending session command to be executed. */
if (sescmd_cursor_is_active(scur))
{
bool succp;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Backend %s:%d processed reply and starts to execute "
@ -2764,9 +2777,7 @@ static void clientReply (
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port)));
succp = execute_sescmd_in_backend(bref);
ss_dassert(succp);
execute_sescmd_in_backend(bref);
}
else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */
{
@ -4320,8 +4331,6 @@ router_handle_state_switch(
{
backend_ref_t* bref;
int rc = 1;
ROUTER_CLIENT_SES* rses;
SESSION* ses;
SERVER* srv;
CHK_DCB(dcb);
@ -4334,12 +4343,7 @@ router_handle_state_switch(
{
goto return_rc;
}
ses = dcb->session;
CHK_SESSION(ses);
rses = (ROUTER_CLIENT_SES *) dcb->session->router_session;
CHK_CLIENT_RSES(rses);
switch(reason)
{
case DCB_REASON_NOT_RESPONDING:
@ -4369,172 +4373,4 @@ static sescmd_cursor_t* backend_ref_get_sescmd_cursor (
return scur;
}
/**
* Read new database name from MYSQL_COM_INIT_DB packet, check that it exists
* in the hashtable and copy its name to MYSQL_session.
*
* @param inst Router instance
* @param rses Router client session
* @param buf Query buffer
*
* @return true if new database is set, false if non-existent database was tried
* to be set
*/
static bool change_current_db(
ROUTER_INSTANCE* inst,
ROUTER_CLIENT_SES* rses,
GWBUF* buf)
{
bool succp;
uint8_t* packet;
unsigned int plen;
int message_len;
char* fail_str,*target;
if(GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5)
{
packet = GWBUF_DATA(buf);
plen = gw_mysql_get_byte3(packet) - 1;
/** Copy database name from MySQL packet to session */
if(query_classifier_get_operation(buf) == QUERY_OP_CHANGE_DB)
{
char* query = modutil_get_SQL(buf);
char *saved,*tok;
tok = strtok_r(query," ;",&saved);
if(tok == NULL || strcasecmp(tok,"use") != 0)
{
skygw_log_write(LOGFILE_ERROR,"Schemarouter: Malformed chage database packet.");
free(query);
message_len = 25 + MYSQL_DATABASE_MAXLEN;
fail_str = calloc(1, message_len+1);
snprintf(fail_str,
message_len,
"Unknown database '%s'",
(char*)rses->rses_mysql_session->db);
rses->rses_mysql_session->db[0] = '\0';
succp = false;
goto reply_error;
}
tok = strtok_r(NULL," ;",&saved);
if(tok == NULL)
{
skygw_log_write(LOGFILE_ERROR,"Schemarouter: Malformed chage database packet.");
free(query);
message_len = 25 + MYSQL_DATABASE_MAXLEN;
fail_str = calloc(1, message_len+1);
snprintf(fail_str,
message_len,
"Unknown database '%s'",
(char*)rses->rses_mysql_session->db);
rses->rses_mysql_session->db[0] = '\0';
succp = false;
goto reply_error;
}
strncpy(rses->rses_mysql_session->db,tok,MYSQL_DATABASE_MAXLEN);
free(query);
query = NULL;
}
else
{
memcpy(rses->rses_mysql_session->db,packet + 5,plen);
memset(rses->rses_mysql_session->db + plen,0,1);
}
skygw_log_write(LOGFILE_TRACE,"schemarouter: INIT_DB with database '%s'",
rses->rses_mysql_session->db);
/**
* Update the session's active database only if it's in the hashtable.
* If it isn't found, send a custom error packet to the client.
*/
if((target = (char*)hashtable_fetch(
rses->dbhash,
(char*)rses->rses_mysql_session->db)) == NULL)
{
/** Create error message */
message_len = 25 + MYSQL_DATABASE_MAXLEN;
fail_str = calloc(1, message_len+1);
snprintf(fail_str,
message_len,
"Unknown database '%s'",
(char*)rses->rses_mysql_session->db);
rses->rses_mysql_session->db[0] = '\0';
succp = false;
goto reply_error;
}
else
{
skygw_log_write(LOGFILE_TRACE,"schemarouter: database is on server: '%s'.",target);
succp = true;
goto retblock;
}
}
else
{
/** Create error message */
skygw_log_write_flush(LOGFILE_ERROR,
"schemarouter: failed to change database: Query buffer too large");
skygw_log_write_flush(LOGFILE_TRACE,
"schemarouter: failed to change database: Query buffer too large [%d bytes]",GWBUF_LENGTH(buf));
message_len = 25 + MYSQL_DATABASE_MAXLEN;
fail_str = calloc(1, message_len+1);
snprintf(fail_str,
message_len,
"Unknown database '%s'",
(char*)rses->rses_mysql_session->db);
succp = false;
goto reply_error;
}
reply_error:
{
GWBUF* errbuf;
skygw_log_write_flush(
LOGFILE_TRACE,
"schemarouter: failed to change database: %s", fail_str);
errbuf = modutil_create_mysql_err_msg(1, 0, 1049, "42000", fail_str);
free(fail_str);
if (errbuf == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Creating buffer for error message failed.")));
goto retblock;
}
/** Set flags that help router to identify session commans reply */
gwbuf_set_type(errbuf, GWBUF_TYPE_MYSQL);
gwbuf_set_type(errbuf, GWBUF_TYPE_SESCMD_RESPONSE);
gwbuf_set_type(errbuf, GWBUF_TYPE_RESPONSE_END);
/**
* Create an incoming event for randomly selected backend DCB which
* will then be notified and replied 'back' to the client.
*/
DCB *dcb = NULL;
int i;
for(i = 0;i<rses->rses_nbackends;i++)
{
if(rses->rses_backend_ref[i].bref_dcb){
dcb = rses->rses_backend_ref[i].bref_dcb;
break;
}
}
if(dcb == NULL)
{
skygw_log_write_flush(LOGFILE_ERROR,"Error : All backend connections are down.");
return false;
}
poll_add_epollin_event_to_dcb(rses->dcb_reply,
gwbuf_clone(errbuf));
gwbuf_free(errbuf);
}
retblock:
return succp;
}