Merge branch 'develop' into mon_script_test

This commit is contained in:
Markus Makela
2015-05-04 09:14:04 +03:00
85 changed files with 2738 additions and 1285 deletions

View File

@ -5,14 +5,6 @@ if(BUILD_TESTS)
install(TARGETS testroute DESTINATION modules)
endif()
add_library(schemarouter SHARED schemarouter/schemarouter.c)
target_link_libraries(schemarouter log_manager utils query_classifier)
install(TARGETS schemarouter DESTINATION modules)
add_library(shardrouter SHARED schemarouter/shardrouter.c)
target_link_libraries(shardrouter log_manager utils query_classifier)
install(TARGETS shardrouter DESTINATION modules)
add_library(readconnroute SHARED readconnroute.c)
target_link_libraries(readconnroute log_manager utils)
install(TARGETS readconnroute DESTINATION modules)
@ -30,7 +22,7 @@ target_link_libraries(cli log_manager utils)
install(TARGETS cli DESTINATION modules)
add_subdirectory(readwritesplit)
add_subdirectory(schemarouter/test)
add_subdirectory(schemarouter)
if(BUILD_BINLOG)
add_subdirectory(binlog)
endif()

View File

@ -164,6 +164,24 @@ blr_file_rotate(ROUTER_INSTANCE *router, char *file, uint64_t pos)
}
/**
* binlog files need an initial 4 magic bytes at the start. blr_file_add_magic()
* adds them.
*
* @param router The router instance
* @param fd file descriptor to the open binlog file
* @return Nothing
*/
static void
blr_file_add_magic(ROUTER_INSTANCE *router, int fd)
{
unsigned char magic[] = BINLOG_MAGIC;
write(fd, magic, 4);
router->binlog_position = 4; /* Initial position after the magic number */
}
/**
* Create a new binlog file for the router to use.
*
@ -176,7 +194,6 @@ blr_file_create(ROUTER_INSTANCE *router, char *file)
{
char path[1024];
int fd;
unsigned char magic[] = BINLOG_MAGIC;
strcpy(path, router->binlogdir);
strcat(path, "/");
@ -184,7 +201,7 @@ unsigned char magic[] = BINLOG_MAGIC;
if ((fd = open(path, O_RDWR|O_CREAT, 0666)) != -1)
{
write(fd, magic, 4);
blr_file_add_magic(router,fd);
}
else
{
@ -197,7 +214,7 @@ unsigned char magic[] = BINLOG_MAGIC;
close(router->binlog_fd);
spinlock_acquire(&router->binlog_lock);
strncpy(router->binlog_name, file,BINLOG_FNAMELEN);
router->binlog_position = 4; /* Initial position after the magic number */
blr_file_add_magic(router, fd);
spinlock_release(&router->binlog_lock);
router->binlog_fd = fd;
return 1;
@ -232,6 +249,19 @@ int fd;
spinlock_acquire(&router->binlog_lock);
strncpy(router->binlog_name, file,BINLOG_FNAMELEN);
router->binlog_position = lseek(fd, 0L, SEEK_END);
if (router->binlog_position < 4) {
if (router->binlog_position == 0) {
blr_file_add_magic(router, fd);
} else {
/* If for any reason the file's length is between 1 and 3 bytes
* then report an error. */
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"%s: binlog file %s has an invalid length %d.",
router->service->name, path, router->binlog_position)));
close(fd);
return;
}
}
spinlock_release(&router->binlog_lock);
router->binlog_fd = fd;
}

View File

@ -1356,7 +1356,7 @@ int action;
"Slave %d is ahead of expected position %s@%d. "
"Expected position %d",
slave->serverid, slave->binlogfile,
slave->binlog_pos,
(unsigned long)slave->binlog_pos,
hdr->next_pos - hdr->event_size)));
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~(CS_UPTODATE|CS_BUSY);

View File

@ -1203,7 +1203,7 @@ uint32_t chksum;
"%s: COM_BINLOG_DUMP: binlog name '%s', length %d, "
"from position %lu.", router->service->name,
slave->binlogfile, binlognamelen,
slave->binlog_pos)));
(unsigned long)slave->binlog_pos)));
slave->seqno = 1;
@ -1261,7 +1261,7 @@ uint32_t chksum;
"%s: New slave %s, server id %d, requested binlog file %s from position %lu",
router->service->name, slave->dcb->remote,
slave->serverid,
slave->binlogfile, slave->binlog_pos)));
slave->binlogfile, (unsigned long)slave->binlog_pos)));
if (slave->binlog_pos != router->binlog_position ||
strcmp(slave->binlogfile, router->binlog_name) != 0)
@ -1532,7 +1532,7 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
"%s: Slave %s is up to date %s, %lu.",
router->service->name,
slave->dcb->remote,
slave->binlogfile, slave->binlog_pos)));
slave->binlogfile, (unsigned long)slave->binlog_pos)));
}
else if ((slave->stats.n_caughtup % 50) == 0)
{
@ -1540,7 +1540,7 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
"%s: Slave %s is up to date %s, %lu.",
router->service->name,
slave->dcb->remote,
slave->binlogfile, slave->binlog_pos)));
slave->binlogfile, (unsigned long)slave->binlog_pos)));
}
}
}
@ -1565,7 +1565,7 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
"which is not the file currently being downloaded. "
"Master binlog is %s, %lu. This may be caused by a "
"previous failure of the master.",
slave->binlogfile, slave->binlog_pos,
slave->binlogfile, (unsigned long)slave->binlog_pos,
router->binlog_name, router->binlog_position)));
if (blr_slave_fake_rotate(router, slave))
{

View File

@ -755,20 +755,10 @@ routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
break;
}
CHK_PROTOCOL(((MySQLProtocol*)backend_dcb->protocol));
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [readconnroute:routeQuery] Routed command %d to dcb %p "
"with return value %d.",
pthread_self(),
mysql_command,
backend_dcb,
rc)));
LOGIF(LOGFILE_TRACE,skygw_log_write(
LOGFILE_TRACE,
"Routed command [%#x] to '%s'%s%s",
mysql_command,
LOGFILE_DEBUG|LOGFILE_TRACE,
"Routed [%s] to '%s'%s%s",
STRPACKETTYPE(mysql_command),
backend_dcb->server->unique_name,
trc?": ":".",
trc?trc:""));

View File

@ -628,7 +628,7 @@ createInstance(SERVICE *service, char **options)
* If server weighting has been defined calculate the percentage
* of load that will be sent to each server. This is only used for
* calculating the least connections, either globally or within a
* service, or the numebr of current operations on a server.
* service, or the number of current operations on a server.
*/
if ((weightby = serviceGetWeightingParameter(service)) != NULL)
{
@ -698,6 +698,13 @@ createInstance(SERVICE *service, char **options)
{
rwsplit_process_router_options(router, options);
}
/** These options cancel each other out */
if(router->rwsplit_config.disable_sescmd_hist && router->rwsplit_config.rw_max_sescmd_history_size > 0)
{
router->rwsplit_config.rw_max_sescmd_history_size = 0;
}
/**
* Set default value for max_slave_connections and for slave selection
* criteria. If parameter is set in config file max_slave_connections
@ -807,7 +814,7 @@ static void* newSession(
rwsplit_process_router_options(router, router->service->routerOptions);
}
/** Copy config struct from router instance */
client_rses->rses_config = router->rwsplit_config;
memcpy(&client_rses->rses_config,&router->rwsplit_config,sizeof(rwsplit_config_t));
spinlock_release(&router->lock);
/**
@ -1876,7 +1883,6 @@ static int routeQuery(
bool succp = false;
CHK_CLIENT_RSES(router_cli_ses);
/**
* GWBUF is called "type undefined" when the incoming data isn't parsed
* and MySQL packets haven't been extracted to separate buffers.
@ -2782,16 +2788,16 @@ static void clientReply (
bool rconn = false;
writebuf = sescmd_cursor_process_replies(writebuf, bref, &rconn);
if(rconn)
if(rconn && !router_inst->rwsplit_config.disable_slave_recovery)
{
select_connect_backend_servers(&router_cli_ses->rses_master_ref,
router_cli_ses->rses_backend_ref,
router_cli_ses->rses_nbackends,
router_cli_ses->rses_config.rw_max_slave_conn_count,
router_cli_ses->rses_config.rw_max_slave_replication_lag,
router_cli_ses->rses_config.rw_slave_select_criteria,
router_cli_ses->rses_master_ref->bref_dcb->session,
router_cli_ses->router);
select_connect_backend_servers(&router_cli_ses->rses_master_ref,
router_cli_ses->rses_backend_ref,
router_cli_ses->rses_nbackends,
router_cli_ses->rses_config.rw_max_slave_conn_count,
router_cli_ses->rses_config.rw_max_slave_replication_lag,
router_cli_ses->rses_config.rw_slave_select_criteria,
router_cli_ses->rses_master_ref->bref_dcb->session,
router_cli_ses->router);
}
}
/**
@ -3667,7 +3673,8 @@ static mysql_sescmd_t* mysql_sescmd_init (
/** Set session command buffer */
sescmd->my_sescmd_buf = sescmd_buf;
sescmd->my_sescmd_packet_type = packet_type;
sescmd->position = atomic_add(&rses->pos_generator,1);
return sescmd;
}
@ -3708,13 +3715,11 @@ static GWBUF* sescmd_cursor_process_replies(
mysql_sescmd_t* scmd;
sescmd_cursor_t* scur;
ROUTER_CLIENT_SES* ses;
ROUTER_INSTANCE* router;
scur = &bref->bref_sescmd_cur;
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
scmd = sescmd_cursor_get_command(scur);
ses = (*scur->scmd_cur_ptr_property)->rses_prop_rsession;
router = ses->router;
CHK_GWBUF(replybuf);
/**
@ -3724,6 +3729,7 @@ static GWBUF* sescmd_cursor_process_replies(
while (scmd != NULL && replybuf != NULL)
{
bref->reply_cmd = *((unsigned char*)replybuf->start + 4);
scur->position = scmd->position;
/** Faster backend has already responded to client : discard */
if (scmd->my_sescmd_is_replied)
{
@ -4361,6 +4367,43 @@ static bool route_session_write(
goto return_succp;
}
if(router_cli_ses->rses_config.disable_sescmd_hist)
{
rses_property_t *prop, *tmp;
backend_ref_t* bref;
bool conflict;
prop = router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD];
while(prop)
{
conflict = false;
for(i = 0;i<router_cli_ses->rses_nbackends;i++)
{
bref = &backend_ref[i];
if(BREF_IS_IN_USE(bref))
{
if(bref->bref_sescmd_cur.position <= prop->rses_prop_data.sescmd.position)
{
conflict = true;
break;
}
}
}
if(conflict)
{
break;
}
tmp = prop;
router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD] = prop->rses_prop_next;
rses_property_done(tmp);
prop = router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD];
}
}
/**
* Additional reference is created to querybuf to
* prevent it from being released before properties
@ -4538,6 +4581,14 @@ static void rwsplit_process_router_options(
{
router->rwsplit_config.rw_max_sescmd_history_size = atoi(value);
}
else if(strcmp(options[i],"disable_sescmd_history") == 0)
{
router->rwsplit_config.disable_sescmd_hist = config_truth_value(value);
}
else if(strcmp(options[i],"disable_slave_recovery") == 0)
{
router->rwsplit_config.disable_slave_recovery = config_truth_value(value);
}
}
} /*< for */
}
@ -4780,6 +4831,12 @@ static bool handle_error_new_connection(
* Try to get replacement slave or at least the minimum
* number of slave connections for router session.
*/
if(inst->rwsplit_config.disable_slave_recovery)
{
succp = have_enough_servers(&rses,1,router_nservers,inst) ? true : false;
}
else
{
succp = select_connect_backend_servers(
&rses->rses_master_ref,
rses->rses_backend_ref,
@ -4789,6 +4846,7 @@ static bool handle_error_new_connection(
rses->rses_config.rw_slave_select_criteria,
ses,
inst);
}
return_succp:
return succp;

View File

@ -0,0 +1,11 @@
add_library(schemarouter SHARED schemarouter.c sharding_common.c)
target_link_libraries(schemarouter log_manager utils query_classifier)
install(TARGETS schemarouter DESTINATION modules)
add_library(shardrouter SHARED shardrouter.c svcconn.c sharding_common.c)
target_link_libraries(shardrouter log_manager utils query_classifier)
install(TARGETS shardrouter DESTINATION modules)
if(BUILD_TESTS)
add_subdirectory(test)
endif()

View File

@ -23,6 +23,7 @@
#include <stdint.h>
#include <router.h>
#include <schemarouter.h>
#include <sharding_common.h>
#include <secrets.h>
#include <mysql.h>
#include <skygw_utils.h>
@ -208,12 +209,6 @@ static ROUTER_INSTANCE* instances;
static int hashkeyfun(void* key);
static int hashcmpfun (void *, void *);
static bool change_current_db(
ROUTER_INSTANCE* inst,
ROUTER_CLIENT_SES* rses,
GWBUF* buf);
static int hashkeyfun(void* key)
{
if(key == NULL){
@ -250,7 +245,7 @@ char* get_lenenc_str(void* data, int* len)
{
unsigned char* ptr = (unsigned char*)data;
char* rval;
long size, offset;
unsigned long size, offset;
if(data == NULL || len == NULL)
{
@ -279,9 +274,14 @@ char* get_lenenc_str(void* data, int* len)
offset = 3;
break;
case 0xfe:
size = *ptr + ((*(ptr + 2) << 8)) + (*(ptr + 3) << 16) +
(*(ptr + 4) << 24) + (*(ptr + 5) << 32) + (*(ptr + 6) << 40) +
(*(ptr + 7) << 48) + (*(ptr + 8) << 56);
size = *ptr +
((*(ptr + 2) << 8)) +
(*(ptr + 3) << 16) +
(*(ptr + 4) << 24) +
((unsigned long)*(ptr + 5) << 32) +
((unsigned long)*(ptr + 6) << 40) +
((unsigned long)*(ptr + 7) << 48) +
((unsigned long)*(ptr + 8) << 56);
offset = 8;
break;
default:
@ -311,32 +311,52 @@ char* get_lenenc_str(void* data, int* len)
* @param buf GWBUF containing the result set
* @return True if the buffer contained a result set with a single column. All other responses return false.
*/
bool parse_showdb_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf)
bool parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t* bref, GWBUF** buffer)
{
bool rval = false;
unsigned char* ptr;
int more = 0;
if(PTR_IS_RESULTSET(((unsigned char*)buf->start)) &&
modutil_count_signal_packets(buf,0,0,&more) == 2)
char* target = bref->bref_backend->backend_server->unique_name;
GWBUF* buf;
if(buffer == NULL || *buffer == NULL)
return false;
buf = modutil_get_complete_packets(buffer);
if(buf == NULL)
return false;
ptr = (unsigned char*)buf->start;
if(PTR_IS_ERR(ptr))
{
skygw_log_write(LOGFILE_TRACE,"schemarouter: SHOW DATABASES returned an error.");
gwbuf_free(buf);
return true;
}
if(bref->n_mapping_eof == 0)
{
ptr = (unsigned char*)buf->start;
if(ptr[4] != 1)
{
/** Something else came back, discard and return with an error*/
return false;
}
/** Skip column definitions */
while(!PTR_IS_EOF(ptr))
while(ptr < (unsigned char*)buf->end && !PTR_IS_EOF(ptr))
{
ptr += gw_mysql_get_byte3(ptr) + 4;
}
if(ptr >= (unsigned char*)buf->end)
{
skygw_log_write(LOGFILE_TRACE,"schemarouter: Malformed packet for SHOW DATABASES.");
*buffer = gwbuf_append(buf,*buffer);
return false;
}
atomic_add(&bref->n_mapping_eof,1);
/** Skip first EOF packet */
ptr += gw_mysql_get_byte3(ptr) + 4;
while(!PTR_IS_EOF(ptr))
}
if(bref->n_mapping_eof == 1)
{
while(ptr < (unsigned char*)buf->end && !PTR_IS_EOF(ptr))
{
int payloadlen = gw_mysql_get_byte3(ptr);
int packetlen = payloadlen + 4;
@ -353,11 +373,23 @@ bool parse_showdb_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf)
}
ptr += packetlen;
}
rval = true;
}
return rval;
if(ptr < (unsigned char*)buf->end && PTR_IS_EOF(ptr) &&
bref->n_mapping_eof == 1)
{
atomic_add(&bref->n_mapping_eof,1);
skygw_log_write(LOGFILE_TRACE,"schemarouter: SHOW DATABASES fully received from %s.",
bref->bref_backend->backend_server->unique_name);
}
else
{
skygw_log_write(LOGFILE_TRACE,"schemarouter: SHOW DATABASES partially received from %s.",
bref->bref_backend->backend_server->unique_name);
}
gwbuf_free(buf);
return bref->n_mapping_eof == 2;
}
/**
@ -372,14 +404,14 @@ bool parse_showdb_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf)
int gen_databaselist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
{
DCB* dcb;
const char* query = "SHOW DATABASES;";
const char* query = "SHOW DATABASES";
GWBUF *buffer,*clone;
int i,rval = 0;
unsigned int len;
session->init |= INIT_MAPPING;
session->init &= ~INIT_UNINT;
len = strlen(query);
len = strlen(query) + 1;
buffer = gwbuf_alloc(len + 4);
*((unsigned char*)buffer->start) = len;
*((unsigned char*)buffer->start + 1) = len>>8;
@ -391,7 +423,8 @@ int gen_databaselist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
for(i = 0;i<session->rses_nbackends;i++)
{
if(BREF_IS_IN_USE(&session->rses_backend_ref[i]) &&
!BREF_IS_CLOSED(&session->rses_backend_ref[i]))
!BREF_IS_CLOSED(&session->rses_backend_ref[i]) &&
SERVER_IS_RUNNING(session->rses_backend_ref[i].bref_backend->backend_server))
{
clone = gwbuf_clone(buffer);
dcb = session->rses_backend_ref[i].bref_dcb;
@ -687,6 +720,13 @@ createInstance(SERVICE *service, char **options)
return NULL;
}
router->service = service;
router->schemarouter_config.max_sescmd_hist = 0;
router->stats.longest_sescmd = 0;
router->stats.n_hist_exceeded = 0;
router->stats.n_queries = 0;
router->stats.n_sescmd = 0;
router->stats.ses_longest = 0;
router->stats.ses_shortest = (double)((unsigned long)(~0));
spinlock_init(&router->lock);
/** Calculate number of servers */
@ -701,6 +741,45 @@ createInstance(SERVICE *service, char **options)
service->users_from_all = true;
}
bool failure = false;
for(i=0;options && options[i];i++)
{
char* value;
if((value = strchr(options[i],'=')) == NULL)
{
skygw_log_write(LOGFILE_ERROR,"Error: Unknown router options for Schemarouter: %s",options[i]);
failure = true;
break;
}
*value = '\0';
value++;
if(strcmp(options[i],"max_sescmd_history") == 0)
{
router->schemarouter_config.max_sescmd_hist = atoi(value);
}
else if(strcmp(options[i],"disable_sescmd_history") == 0)
{
router->schemarouter_config.disable_sescmd_hist = config_truth_value(value);
}
else
{
skygw_log_write(LOGFILE_ERROR,"Error: Unknown router options for Schemarouter: %s",options[i]);
failure = true;
break;
}
}
/** Setting a limit to the history size is not needed if it is disabled.*/
if(router->schemarouter_config.disable_sescmd_hist && router->schemarouter_config.max_sescmd_hist > 0)
router->schemarouter_config.max_sescmd_hist = 0;
if(failure)
{
free(router);
return NULL;
}
while (server != NULL)
{
nservers++;
@ -730,6 +809,7 @@ createInstance(SERVICE *service, char **options)
router->servers[nservers]->backend_conn_count = 0;
router->servers[nservers]->weight = 1;
router->servers[nservers]->be_valid = false;
router->servers[nservers]->stats.queries = 0;
if(server->server->monuser == NULL && service->credentials.name != NULL)
{
router->servers[nservers]->backend_server->monuser =
@ -824,7 +904,7 @@ static void* newSession(
{
protocol->client_capabilities &= ~GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB;
strncpy(db,data->db,MYSQL_DATABASE_MAXLEN+1);
strncpy(db,data->db,MYSQL_DATABASE_MAXLEN);
memset(data->db,0,MYSQL_DATABASE_MAXLEN+1);
using_db = true;
skygw_log_write(LOGFILE_TRACE,"schemarouter: Client logging in directly to a database '%s', "
@ -858,7 +938,8 @@ static void* newSession(
client_rses->dcb_reply->func.read = internalReply;
client_rses->dcb_reply->state = DCB_STATE_POLLING;
client_rses->dcb_reply->session = session;
memcpy(&client_rses->rses_config,&router->schemarouter_config,sizeof(schemarouter_config_t));
client_rses->n_sescmd = 0;
client_rses->dcb_route = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
client_rses->dcb_route->func.read = internalRoute;
client_rses->dcb_route->state = DCB_STATE_POLLING;
@ -905,6 +986,8 @@ static void* newSession(
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR;
#endif
backend_ref[i].bref_state = 0;
backend_ref[i].n_mapping_eof = 0;
backend_ref[i].map_queue = NULL;
backend_ref[i].bref_backend = router->servers[i];
/** store pointers to sescmd list to both cursors */
backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses;
@ -957,7 +1040,6 @@ static void* newSession(
client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT;
client_rses->rses_backend_ref = backend_ref;
client_rses->rses_nbackends = router_nservers; /*< # of backend servers */
router->stats.n_sessions += 1;
if (!(succp = rses_begin_locked_router_action(client_rses)))
{
@ -977,7 +1059,8 @@ static void* newSession(
rses_end_locked_router_action(client_rses);
atomic_add(&router->stats.sessions, 1);
/**
* Version is bigger than zero once initialized.
@ -1016,6 +1099,7 @@ static void closeSession(
void* router_session)
{
ROUTER_CLIENT_SES* router_cli_ses;
ROUTER_INSTANCE* inst;
backend_ref_t* backend_ref;
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
@ -1032,7 +1116,8 @@ static void closeSession(
}
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
CHK_CLIENT_RSES(router_cli_ses);
inst = router_cli_ses->router;
backend_ref = router_cli_ses->rses_backend_ref;
/**
* Lock router client session for secure read and update.
@ -1087,9 +1172,28 @@ static void closeSession(
router_cli_ses->dcb_route->session = NULL;
dcb_close(router_cli_ses->dcb_reply);
dcb_close(router_cli_ses->dcb_route);
if(router_cli_ses->queue)
router_cli_ses->queue = gwbuf_consume(
router_cli_ses->queue,gwbuf_length(router_cli_ses->queue));
/** Unlock */
rses_end_locked_router_action(router_cli_ses);
rses_end_locked_router_action(router_cli_ses);
spinlock_acquire(&inst->lock);
if(inst->stats.longest_sescmd < router_cli_ses->stats.longest_sescmd)
inst->stats.longest_sescmd = router_cli_ses->stats.longest_sescmd;
double ses_time = difftime(time(NULL),router_cli_ses->rses_client_dcb->session->stats.connect);
if(inst->stats.ses_longest < ses_time)
inst->stats.ses_longest = ses_time;
if(inst->stats.ses_shortest > ses_time && inst->stats.ses_shortest > 0)
inst->stats.ses_shortest = ses_time;
inst->stats.ses_average =
(ses_time + ((inst->stats.sessions - 1) * inst->stats.ses_average)) /
(inst->stats.sessions);
spinlock_release(&inst->lock);
}
}
@ -1511,6 +1615,12 @@ void check_create_tmp_table(
free(tblname);
}
}
int cmpfn(const void* a, const void *b)
{
return strcmp(*(char**)a,*(char**)b);
}
/**
* Generates a custom SHOW DATABASES result set from all the databases in the
* hashtable. Only backend servers that are up and in a proper state are listed
@ -1621,37 +1731,71 @@ gen_show_dbs_response(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client)
memcpy(ptr, eof, sizeof(eof));
unsigned int packet_num = 4;
int j = 0,ndbs = 0, bufsz = 10;
char** dbs;
if((dbs = malloc(sizeof(char*)*bufsz)) == NULL)
{
gwbuf_free(rval);
hashtable_iterator_free(iter);
return NULL;
}
while((value = (char*) hashtable_next(iter)))
{
char* bend = hashtable_fetch(ht, value);
for(i = 0; backends[i]; i++)
{
if(strcmp(bref[i].bref_backend->backend_server->unique_name, bend) == 0 &&
BREF_IS_IN_USE(&bref[i]) && !BREF_IS_CLOSED(&bref[i]))
{
ndbs++;
GWBUF* temp;
int plen = strlen(value) + 1;
if(ndbs >= bufsz)
{
bufsz += bufsz / 2;
char** tmp = realloc(dbs,sizeof(char*)*bufsz);
if(tmp == NULL)
{
gwbuf_free(rval);
hashtable_iterator_free(iter);
for(i=0;i<ndbs-1;i++)free(dbs[i]);
free(dbs);
return NULL;
}
dbs = tmp;
}
sprintf(dbname, "%s", value);
temp = gwbuf_alloc(plen + 4);
ptr = temp->start;
*ptr++ = plen;
*ptr++ = plen >> 8;
*ptr++ = plen >> 16;
*ptr++ = packet_num++;
*ptr++ = plen - 1;
memcpy(ptr, dbname, plen - 1);
/** Append the row*/
rval = gwbuf_append(rval, temp);
dbs[j++] = strdup(value);
}
}
}
qsort(&dbs[0],(size_t)ndbs,sizeof(char*),cmpfn);
for(j = 0;j<ndbs;j++)
{
GWBUF* temp;
int plen = strlen(dbs[j]) + 1;
sprintf(dbname, "%s", dbs[j]);
temp = gwbuf_alloc(plen + 4);
ptr = temp->start;
*ptr++ = plen;
*ptr++ = plen >> 8;
*ptr++ = plen >> 16;
*ptr++ = packet_num++;
*ptr++ = plen - 1;
memcpy(ptr, dbname, plen - 1);
/** Append the row*/
rval = gwbuf_append(rval, temp);
free(dbs[j]);
}
eof[3] = packet_num;
GWBUF* last_packet = gwbuf_alloc(sizeof(eof));
@ -1660,6 +1804,7 @@ gen_show_dbs_response(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client)
rval = gwbuf_make_contiguous(rval);
hashtable_iterator_free(iter);
free(dbs);
return rval;
}
@ -1687,12 +1832,12 @@ gen_show_dbs_response(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client)
static int routeQuery(
ROUTER* instance,
void* router_session,
GWBUF* querybuf)
GWBUF* qbuf)
{
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
mysql_server_cmd_t packet_type;
uint8_t* packet;
int ret = 0;
int i,ret = 0;
DCB* target_dcb = NULL;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
@ -1701,8 +1846,9 @@ static int routeQuery(
route_target_t route_target = TARGET_UNDEFINED;
bool succp = false;
char* tname = NULL;
GWBUF* querybuf = qbuf;
char db[MYSQL_DATABASE_MAXLEN + 1];
char errbuf[26+MYSQL_DATABASE_MAXLEN];
CHK_CLIENT_RSES(router_cli_ses);
/** Dirty read for quick check if router is closed. */
@ -1734,11 +1880,11 @@ static int routeQuery(
{
char* querystr = modutil_get_SQL(querybuf);
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Storing query for session %p: %s",
skygw_log_write(LOGFILE_DEBUG|LOGFILE_TRACE,"schemarouter: Storing query for session %p: %s",
router_cli_ses->rses_client_dcb->session,
querystr);
free(querystr);
gwbuf_make_contiguous(querybuf);
querybuf = gwbuf_make_contiguous(querybuf);
GWBUF* ptr = router_cli_ses->queue;
while(ptr && ptr->next)
@ -1875,12 +2021,25 @@ static int routeQuery(
if (packet_type == MYSQL_COM_INIT_DB ||
op == QUERY_OP_CHANGE_DB)
{
if (!(change_successful = change_current_db(inst, router_cli_ses, querybuf)))
if (!(change_successful = change_current_db(router_cli_ses->rses_mysql_session,
router_cli_ses->dbhash,
querybuf)))
{
extract_database(querybuf,db);
snprintf(errbuf,25+MYSQL_DATABASE_MAXLEN,"Unknown database: %s",db);
for(i = 0;i<router_cli_ses->rses_nbackends;i++)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Changing database failed.")));
if(SERVER_IS_RUNNING(router_cli_ses->rses_backend_ref[i].bref_backend->backend_server))
{
create_error_reply(errbuf,router_cli_ses->rses_backend_ref[i].bref_dcb);
break;
}
}
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Changing database failed.")));
}
}
if(QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_DATABASES))
@ -1997,7 +2156,8 @@ static int routeQuery(
if (succp)
{
atomic_add(&inst->stats.n_all, 1);
atomic_add(&inst->stats.n_sescmd, 1);
atomic_add(&inst->stats.n_queries, 1);
ret = 1;
}
goto retblock;
@ -2198,6 +2358,9 @@ diagnostic(ROUTER *instance, DCB *dcb)
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
int i = 0;
double sescmd_pct = router->stats.n_sescmd != 0 ?
100.0*((double)router->stats.n_sescmd / (double)router->stats.n_queries) :
0.0;
dcb_printf(dcb,"\33[1;4m%-16s%-16s%-16s\33[0m\n","Server","Queries","State");
for(i=0;router->servers[i];i++)
@ -2210,6 +2373,45 @@ diagnostic(ROUTER *instance, DCB *dcb)
"\33[30;41mDOWN\33[0m");
}
/** Session command statistics */
dcb_printf(dcb,"\n\33[1;4mSession Commands\33[0m\n");
dcb_printf(dcb,"Total number of queries: %d\n",
router->stats.n_queries);
dcb_printf(dcb,"Percentage of session commands: %.2f\n",
sescmd_pct);
dcb_printf(dcb,"Longest chain of stored session commands: %d\n",
router->stats.longest_sescmd);
dcb_printf(dcb,"Session command history limit exceeded: %d times\n",
router->stats.n_hist_exceeded);
if(!router->schemarouter_config.disable_sescmd_hist)
{
dcb_printf(dcb,"Session command history: enabled\n");
if(router->schemarouter_config.max_sescmd_hist == 0)
{
dcb_printf(dcb,"Session command history limit: unlimited\n");
}
else
{
dcb_printf(dcb,"Session command history limit: %d\n",
router->schemarouter_config.max_sescmd_hist);
}
}
else
{
dcb_printf(dcb,"Session command history: disabled\n");
}
/** Session time statistics */
if(router->stats.sessions > 0)
{
dcb_printf(dcb,"\n\33[1;4mSession Time Statistics\33[0m\n");
dcb_printf(dcb,"Longest session: %.2lf seconds\n",router->stats.ses_longest);
dcb_printf(dcb,"Shortest session: %.2lf seconds\n",router->stats.ses_shortest);
dcb_printf(dcb,"Average session length: %.2lf seconds\n",router->stats.ses_average);
}
dcb_printf(dcb,"\n");
}
/**
@ -2225,14 +2427,15 @@ diagnostic(ROUTER *instance, DCB *dcb)
static void clientReply (
ROUTER* instance,
void* router_session,
GWBUF* writebuf,
GWBUF* buffer,
DCB* backend_dcb)
{
DCB* client_dcb;
ROUTER_CLIENT_SES* router_cli_ses;
sescmd_cursor_t* scur = NULL;
backend_ref_t* bref;
GWBUF* writebuf = buffer;
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
CHK_CLIENT_RSES(router_cli_ses);
@ -2303,16 +2506,31 @@ static void clientReply (
for(i = 0; i < router_cli_ses->rses_nbackends; i++)
{
if(bref->bref_dcb == bkrf[i].bref_dcb && !BREF_IS_MAPPED(&bkrf[i]))
{
router_cli_ses->rses_backend_ref[i].bref_mapped = true;
parse_showdb_response(router_cli_ses,
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
writebuf);
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Received SHOW DATABASES reply from %s for session %p",
if(bref->map_queue)
{
writebuf = gwbuf_append(bref->map_queue,writebuf);
bref->map_queue = NULL;
}
if(parse_showdb_response(router_cli_ses,
&router_cli_ses->rses_backend_ref[i],
&writebuf))
{
router_cli_ses->rses_backend_ref[i].bref_mapped = true;
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Received SHOW DATABASES reply from %s for session %p",
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
router_cli_ses->rses_client_dcb->session);
}
else
{
bref->map_queue = writebuf;
writebuf = NULL;
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Received partial SHOW DATABASES reply from %s for session %p",
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
router_cli_ses->rses_client_dcb->session);
}
}
if(BREF_IS_IN_USE(&bkrf[i]) &&
@ -2329,7 +2547,7 @@ static void clientReply (
}
}
while((writebuf = gwbuf_consume(writebuf,gwbuf_length(writebuf))));
while(writebuf && (writebuf = gwbuf_consume(writebuf,gwbuf_length(writebuf))));
if(mapped)
{
@ -2551,8 +2769,7 @@ static void clientReply (
/** There is one pending session command to be executed. */
if (sescmd_cursor_is_active(scur))
{
bool succp;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Backend %s:%d processed reply and starts to execute "
@ -2560,9 +2777,7 @@ static void clientReply (
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port)));
succp = execute_sescmd_in_backend(bref);
ss_dassert(succp);
execute_sescmd_in_backend(bref);
}
else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */
{
@ -3038,7 +3253,7 @@ static mysql_sescmd_t* mysql_sescmd_init (
/** Set session command buffer */
sescmd->my_sescmd_buf = sescmd_buf;
sescmd->my_sescmd_packet_type = packet_type;
sescmd->position = atomic_add(&rses->pos_generator,1);
return sescmd;
}
@ -3090,6 +3305,7 @@ static GWBUF* sescmd_cursor_process_replies(
*/
while (scmd != NULL && replybuf != NULL)
{
scur->position = scmd->position;
/** Faster backend has already responded to client : discard */
if (scmd->my_sescmd_is_replied)
{
@ -3641,17 +3857,74 @@ static bool route_session_write(
succp = false;
goto return_succp;
}
/**
if(router_cli_ses->rses_config.max_sescmd_hist > 0 &&
router_cli_ses->n_sescmd >= router_cli_ses->rses_config.max_sescmd_hist)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR|LOGFILE_TRACE,
"Router session exceeded session command history limit of %d. "
"Closing router session.",
router_cli_ses->rses_config.max_sescmd_hist)));
gwbuf_free(querybuf);
atomic_add(&router_cli_ses->router->stats.n_hist_exceeded,1);
rses_end_locked_router_action(router_cli_ses);
router_cli_ses->rses_client_dcb->func.hangup(router_cli_ses->rses_client_dcb);
goto return_succp;
}
if(router_cli_ses->rses_config.disable_sescmd_hist)
{
rses_property_t *prop, *tmp;
backend_ref_t* bref;
bool conflict;
prop = router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD];
while(prop)
{
conflict = false;
for(i = 0;i<router_cli_ses->rses_nbackends;i++)
{
bref = &backend_ref[i];
if(BREF_IS_IN_USE(bref))
{
if(bref->bref_sescmd_cur.position <= prop->rses_prop_data.sescmd.position)
{
conflict = true;
break;
}
}
}
if(conflict)
{
break;
}
tmp = prop;
router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD] = prop->rses_prop_next;
rses_property_done(tmp);
prop = router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD];
}
}
/**
*
* Additional reference is created to querybuf to
* prevent it from being released before properties
* are cleaned up as a part of router sessionclean-up.
* are cleaned up as a part of router session clean-up.
*/
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses);
/** Add sescmd property to router client session */
rses_property_add(router_cli_ses, prop);
atomic_add(&router_cli_ses->stats.longest_sescmd,1);
atomic_add(&router_cli_ses->n_sescmd,1);
for (i=0; i<router_cli_ses->rses_nbackends; i++)
{
if (BREF_IS_IN_USE((&backend_ref[i])))
@ -3706,6 +3979,10 @@ static bool route_session_write(
backend_ref[i].bref_backend->backend_server->name,
backend_ref[i].bref_backend->backend_server->port)));
}
else
{
atomic_add(&backend_ref[i].bref_backend->stats.queries,1);
}
}
}
else
@ -3885,7 +4162,7 @@ static bool handle_error_new_connection(
{
SESSION* ses;
int router_nservers,i;
unsigned char cmd = *((unsigned char*)errmsg->start + 4);
backend_ref_t* bref;
bool succp;
@ -3960,10 +4237,11 @@ static bool handle_error_new_connection(
}
rses->init |= INIT_MAPPING;
for(i = 0;i<rses->rses_nbackends;i++)
{
bref_clear_state(&rses->rses_backend_ref[i],BREF_DB_MAPPED);
bref_clear_state(&rses->rses_backend_ref[i],BREF_DB_MAPPED);
rses->rses_backend_ref[i].n_mapping_eof = 0;
}
HASHITERATOR* iter = hashtable_iterator(rses->dbhash);
@ -4053,8 +4331,6 @@ router_handle_state_switch(
{
backend_ref_t* bref;
int rc = 1;
ROUTER_CLIENT_SES* rses;
SESSION* ses;
SERVER* srv;
CHK_DCB(dcb);
@ -4067,12 +4343,7 @@ router_handle_state_switch(
{
goto return_rc;
}
ses = dcb->session;
CHK_SESSION(ses);
rses = (ROUTER_CLIENT_SES *) dcb->session->router_session;
CHK_CLIENT_RSES(rses);
switch(reason)
{
case DCB_REASON_NOT_RESPONDING:
@ -4102,172 +4373,4 @@ static sescmd_cursor_t* backend_ref_get_sescmd_cursor (
return scur;
}
/**
* Read new database name from MYSQL_COM_INIT_DB packet, check that it exists
* in the hashtable and copy its name to MYSQL_session.
*
* @param inst Router instance
* @param rses Router client session
* @param buf Query buffer
*
* @return true if new database is set, false if non-existent database was tried
* to be set
*/
static bool change_current_db(
ROUTER_INSTANCE* inst,
ROUTER_CLIENT_SES* rses,
GWBUF* buf)
{
bool succp;
uint8_t* packet;
unsigned int plen;
int message_len;
char* fail_str,*target;
if(GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5)
{
packet = GWBUF_DATA(buf);
plen = gw_mysql_get_byte3(packet) - 1;
/** Copy database name from MySQL packet to session */
if(query_classifier_get_operation(buf) == QUERY_OP_CHANGE_DB)
{
char* query = modutil_get_SQL(buf);
char *saved,*tok;
tok = strtok_r(query," ;",&saved);
if(tok == NULL || strcasecmp(tok,"use") != 0)
{
skygw_log_write(LOGFILE_ERROR,"Schemarouter: Malformed chage database packet.");
free(query);
message_len = 25 + MYSQL_DATABASE_MAXLEN;
fail_str = calloc(1, message_len+1);
snprintf(fail_str,
message_len,
"Unknown database '%s'",
(char*)rses->rses_mysql_session->db);
rses->rses_mysql_session->db[0] = '\0';
succp = false;
goto reply_error;
}
tok = strtok_r(NULL," ;",&saved);
if(tok == NULL)
{
skygw_log_write(LOGFILE_ERROR,"Schemarouter: Malformed chage database packet.");
free(query);
message_len = 25 + MYSQL_DATABASE_MAXLEN;
fail_str = calloc(1, message_len+1);
snprintf(fail_str,
message_len,
"Unknown database '%s'",
(char*)rses->rses_mysql_session->db);
rses->rses_mysql_session->db[0] = '\0';
succp = false;
goto reply_error;
}
strncpy(rses->rses_mysql_session->db,tok,MYSQL_DATABASE_MAXLEN);
free(query);
query = NULL;
}
else
{
memcpy(rses->rses_mysql_session->db,packet + 5,plen);
memset(rses->rses_mysql_session->db + plen,0,1);
}
skygw_log_write(LOGFILE_TRACE,"schemarouter: INIT_DB with database '%s'",
rses->rses_mysql_session->db);
/**
* Update the session's active database only if it's in the hashtable.
* If it isn't found, send a custom error packet to the client.
*/
if((target = (char*)hashtable_fetch(
rses->dbhash,
(char*)rses->rses_mysql_session->db)) == NULL)
{
/** Create error message */
message_len = 25 + MYSQL_DATABASE_MAXLEN;
fail_str = calloc(1, message_len+1);
snprintf(fail_str,
message_len,
"Unknown database '%s'",
(char*)rses->rses_mysql_session->db);
rses->rses_mysql_session->db[0] = '\0';
succp = false;
goto reply_error;
}
else
{
skygw_log_write(LOGFILE_TRACE,"schemarouter: database is on server: '%s'.",target);
succp = true;
goto retblock;
}
}
else
{
/** Create error message */
skygw_log_write_flush(LOGFILE_ERROR,
"schemarouter: failed to change database: Query buffer too large");
skygw_log_write_flush(LOGFILE_TRACE,
"schemarouter: failed to change database: Query buffer too large [%d bytes]",GWBUF_LENGTH(buf));
message_len = 25 + MYSQL_DATABASE_MAXLEN;
fail_str = calloc(1, message_len+1);
snprintf(fail_str,
message_len,
"Unknown database '%s'",
(char*)rses->rses_mysql_session->db);
succp = false;
goto reply_error;
}
reply_error:
{
GWBUF* errbuf;
skygw_log_write_flush(
LOGFILE_TRACE,
"schemarouter: failed to change database: %s", fail_str);
errbuf = modutil_create_mysql_err_msg(1, 0, 1049, "42000", fail_str);
free(fail_str);
if (errbuf == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Creating buffer for error message failed.")));
goto retblock;
}
/** Set flags that help router to identify session commans reply */
gwbuf_set_type(errbuf, GWBUF_TYPE_MYSQL);
gwbuf_set_type(errbuf, GWBUF_TYPE_SESCMD_RESPONSE);
gwbuf_set_type(errbuf, GWBUF_TYPE_RESPONSE_END);
/**
* Create an incoming event for randomly selected backend DCB which
* will then be notified and replied 'back' to the client.
*/
DCB *dcb = NULL;
int i;
for(i = 0;i<rses->rses_nbackends;i++)
{
if(rses->rses_backend_ref[i].bref_dcb){
dcb = rses->rses_backend_ref[i].bref_dcb;
break;
}
}
if(dcb == NULL)
{
skygw_log_write_flush(LOGFILE_ERROR,"Error : All backend connections are down.");
return false;
}
poll_add_epollin_event_to_dcb(rses->dcb_reply,
gwbuf_clone(errbuf));
gwbuf_free(errbuf);
}
retblock:
return succp;
}

View File

@ -0,0 +1,161 @@
/*
* This file is distributed as part of the MariaDB Corporation MaxScale. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright MariaDB Corporation Ab 2013-2015
*/
#include <sharding_common.h>
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
/**
* Extract the database name from a COM_INIT_DB or literal USE ... query.
* @param buf Buffer with the database change query
* @param str Pointer where the database name is copied
* @return True for success, false for failure
*/
bool extract_database(GWBUF* buf, char* str)
{
uint8_t* packet;
char *saved,*tok,*query = NULL;
bool succp = true;
unsigned int plen;
packet = GWBUF_DATA(buf);
plen = gw_mysql_get_byte3(packet) - 1;
/** Copy database name from MySQL packet to session */
if(query_classifier_get_operation(buf) == QUERY_OP_CHANGE_DB)
{
query = modutil_get_SQL(buf);
tok = strtok_r(query," ;",&saved);
if(tok == NULL || strcasecmp(tok,"use") != 0)
{
skygw_log_write(LOGFILE_ERROR,"Schemarouter: Malformed chage database packet.");
succp = false;
goto retblock;
}
tok = strtok_r(NULL," ;",&saved);
if(tok == NULL)
{
skygw_log_write(LOGFILE_ERROR,"Schemarouter: Malformed chage database packet.");
succp = false;
goto retblock;
}
strncpy(str,tok,MYSQL_DATABASE_MAXLEN);
}
else
{
memcpy(str,packet + 5,plen);
memset(str + plen,0,1);
}
retblock:
free(query);
return succp;
}
/**
* Create a fake error message from a DCB.
* @param fail_str Custom error message
* @param dcb DCB to use as the origin of the error
*/
void create_error_reply(char* fail_str,DCB* dcb)
{
skygw_log_write_flush(
LOGFILE_TRACE,
"change_current_db: failed to change database: %s", fail_str);
GWBUF* errbuf = modutil_create_mysql_err_msg(1, 0, 1049, "42000", fail_str);
if (errbuf == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Creating buffer for error message failed.")));
return;
}
/** Set flags that help router to identify session commands reply */
gwbuf_set_type(errbuf, GWBUF_TYPE_MYSQL);
gwbuf_set_type(errbuf, GWBUF_TYPE_SESCMD_RESPONSE);
gwbuf_set_type(errbuf, GWBUF_TYPE_RESPONSE_END);
poll_add_epollin_event_to_dcb(dcb,
errbuf);
}
/**
* Read new database name from MYSQL_COM_INIT_DB packet or a literal USE ... COM_QUERY packet, check that it exists
* in the hashtable and copy its name to MYSQL_session.
*
* @param mysql_session The MySQL session structure
* @param dbhash Hashtable containing valid databases
* @param buf Buffer containing the database change query
*
* @return true if new database is set, false if non-existent database was tried
* to be set
*/
bool change_current_db(MYSQL_session* mysql_session,
HASHTABLE* dbhash,
GWBUF* buf)
{
char* target;
bool succp;
char db[MYSQL_DATABASE_MAXLEN+1];
if(GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5)
{
/** Copy database name from MySQL packet to session */
if(!extract_database(buf,db))
{
succp = false;
goto retblock;
}
skygw_log_write(LOGFILE_TRACE,"change_current_db: INIT_DB with database '%s'",
db);
/**
* Update the session's active database only if it's in the hashtable.
* If it isn't found, send a custom error packet to the client.
*/
if((target = (char*)hashtable_fetch(dbhash,(char*)db)) == NULL)
{
succp = false;
goto retblock;
}
else
{
strncpy(mysql_session->db,db,MYSQL_DATABASE_MAXLEN);
skygw_log_write(LOGFILE_TRACE,"change_current_db: database is on server: '%s'.",target);
succp = true;
goto retblock;
}
}
else
{
/** Create error message */
skygw_log_write_flush(LOGFILE_ERROR,
"change_current_db: failed to change database: Query buffer too large");
skygw_log_write_flush(LOGFILE_TRACE,
"change_current_db: failed to change database: Query buffer too large [%d bytes]",GWBUF_LENGTH(buf));
succp = false;
goto retblock;
}
retblock:
return succp;
}

View File

@ -13,8 +13,9 @@
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright MariaDB Corporation Ab 2013-2014
* Copyright MariaDB Corporation Ab 2013-2015
*/
#include <my_config.h>
#include <stdio.h>
#include <strings.h>
@ -24,6 +25,7 @@
#include <router.h>
#include <shardrouter.h>
#include <sharding_common.h>
#include <secrets.h>
#include <mysql.h>
#include <skygw_utils.h>
@ -119,9 +121,9 @@ static route_target_t get_shard_route_target(
static uint8_t getCapabilities(ROUTER* inst, void* router_session);
static void subsvc_clear_state(SUBSERVICE* svc,subsvc_state_t state);
static void subsvc_set_state(SUBSERVICE* svc,subsvc_state_t state);
static bool get_shard_subsvc(SUBSERVICE** subsvc,ROUTER_CLIENT_SES* session,char* target);
void subsvc_clear_state(SUBSERVICE* svc,subsvc_state_t state);
void subsvc_set_state(SUBSERVICE* svc,subsvc_state_t state);
bool get_shard_subsvc(SUBSERVICE** subsvc,ROUTER_CLIENT_SES* session,char* target);
static ROUTER_OBJECT MyObject = {
createInstance,
@ -205,19 +207,6 @@ static void refreshInstance(
CONFIG_PARAMETER* param);
static int router_handle_state_switch(DCB* dcb, DCB_REASON reason, void* data);
/*
static bool handle_error_new_connection(
ROUTER_INSTANCE* inst,
ROUTER_CLIENT_SES* rses,
DCB* backend_dcb,
GWBUF* errmsg);
static void handle_error_reply_client(
SESSION* ses,
ROUTER_CLIENT_SES* rses,
DCB* backend_dcb,
GWBUF* errmsg);
*/
static SPINLOCK instlock;
static ROUTER_INSTANCE* instances;
@ -225,11 +214,6 @@ static ROUTER_INSTANCE* instances;
static int hashkeyfun(void* key);
static int hashcmpfun(void *, void *);
static bool change_current_db(
ROUTER_INSTANCE* inst,
ROUTER_CLIENT_SES* rses,
GWBUF* buf);
static int
hashkeyfun(void* key)
{
@ -300,8 +284,8 @@ char* get_lenenc_str(void* data, int* len)
break;
case 0xfe:
size = *ptr + ((*(ptr + 2) << 8)) + (*(ptr + 3) << 16) +
(*(ptr + 4) << 24) + (*(ptr + 5) << 32) + (*(ptr + 6) << 40) +
(*(ptr + 7) << 48) + (*(ptr + 8) << 56);
(*(ptr + 4) << 24) + ((long)*(ptr + 5) << 32) + ((long)*(ptr + 6) << 40) +
((long)*(ptr + 7) << 48) + ((long)*(ptr + 8) << 56);
offset = 8;
break;
default:
@ -972,6 +956,7 @@ createInstance(SERVICE *service, char **options)
if((res_svc = calloc(sz, sizeof(SERVICE*))) == NULL)
{
free(router);
free(services);
skygw_log_write(LOGFILE_ERROR,"Error: Memory allocation failed.");
return NULL;
}
@ -1190,7 +1175,7 @@ newSession(
atomic_add(&client_rses->rses_versno, 2);
ss_dassert(client_rses->rses_versno == 2);
client_rses->dbhash = hashtable_alloc(100, hashkeyfun, hashcmpfun);
client_rses->dbhash = hashtable_alloc(100, simple_str_hash,strcmp);
hashtable_memory_fns(client_rses->dbhash, (HASHMEMORYFN) strdup,
(HASHMEMORYFN) strdup,
(HASHMEMORYFN) free,
@ -1296,11 +1281,10 @@ freeSession(
void* router_client_session)
{
ROUTER_CLIENT_SES* router_cli_ses;
ROUTER_INSTANCE* router;
int i;
router_cli_ses = (ROUTER_CLIENT_SES *) router_client_session;
router = (ROUTER_INSTANCE *) router_instance;
/**
* For each property type, walk through the list, finalize properties
@ -1564,7 +1548,7 @@ routeQuery(ROUTER* instance,
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
mysql_server_cmd_t packet_type;
uint8_t* packet;
int ret = 1;
int i,ret = 1;
SUBSERVICE* target_subsvc;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *) instance;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *) router_session;
@ -1573,7 +1557,10 @@ routeQuery(ROUTER* instance,
route_target_t route_target = TARGET_UNDEFINED;
bool succp = false;
char* tname = NULL;
skygw_log_write_flush(LOGFILE_TRACE,"shardrouter: routeQuery");
char db[MYSQL_DATABASE_MAXLEN + 1];
char errbuf[26+MYSQL_DATABASE_MAXLEN];
skygw_log_write_flush(LOGFILE_DEBUG,"shardrouter: routeQuery");
CHK_CLIENT_RSES(router_cli_ses);
/** Dirty read for quick check if router is closed. */
@ -1714,8 +1701,13 @@ routeQuery(ROUTER* instance,
if(packet_type == MYSQL_COM_INIT_DB)
{
if(!(change_successful = change_current_db(inst, router_cli_ses, querybuf)))
if(!(change_successful = change_current_db(router_cli_ses->rses_mysql_session,
router_cli_ses->dbhash,
querybuf)))
{
extract_database(querybuf,db);
snprintf(errbuf,"Unknown database: %s",db);
create_error_reply(errbuf,router_cli_ses->replydcb);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Changing database failed.")));
@ -2066,39 +2058,6 @@ clientReply(
return;
}
static void
subsvc_set_state(SUBSERVICE* svc,subsvc_state_t state)
{
if(state & SUBSVC_WAITING_RESULT)
{
int prev1;
/** Increase waiter count */
prev1 = atomic_add(&svc->n_res_waiting, 1);
ss_dassert(prev1 >= 0);
}
svc->state |= state;
}
static void
subsvc_clear_state(SUBSERVICE* svc,subsvc_state_t state)
{
if(state & SUBSVC_WAITING_RESULT)
{
int prev1;
/** Decrease waiter count */
prev1 = atomic_add(&svc->n_res_waiting, -1);
ss_dassert(prev1 >= 0);
}
svc->state &= ~state;
}
/**
* Create a generic router session property strcture.
*/
@ -2639,8 +2598,7 @@ mysql_sescmd_get_property(
* capabilities specified, rc > 0 when there are capabilities.
*/
static uint8_t
getCapabilities(
ROUTER* inst,
getCapabilities(ROUTER* inst,
void* router_session)
{
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *) router_session;
@ -2910,37 +2868,7 @@ handleError(
}
static bool
get_shard_subsvc(SUBSERVICE** subsvc,ROUTER_CLIENT_SES* session,char* target)
{
int i;
if(subsvc == NULL || session == NULL || target == NULL)
return false;
for(i = 0;i<session->n_subservice;i++)
{
if(strcmp(session->subservice[i]->service->name,target) == 0)
{
if (SUBSVC_IS_OK(session->subservice[i]))
{
if(subsvc_is_valid(session->subservice[i])){
*subsvc = session->subservice[i];
return true;
}
/**
* The service has failed
*/
subsvc_set_state(session->subservice[i],SUBSVC_FAILED);
}
}
}
return false;
}
/**
* Finds the subservice who owns this session.
* @param rses Router client session
@ -3008,110 +2936,3 @@ return_rc:
return rc;
#endif
}
/**
* Read new database nbame from MYSQL_COM_INIT_DB packet, check that it exists
* in the hashtable and copy its name to MYSQL_session.
*
* @param inst Router instance
* @param rses Router client session
* @param buf Query buffer
*
* @return true if new database is set, false if non-existent database was tried
* to be set
*/
static bool
change_current_db(
ROUTER_INSTANCE* inst,
ROUTER_CLIENT_SES* rses,
GWBUF* buf)
{
bool succp;
uint8_t* packet;
unsigned int plen;
int message_len;
char* fail_str;
if(GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5)
{
packet = GWBUF_DATA(buf);
plen = gw_mysql_get_byte3(packet) - 1;
/** Copy database name from MySQL packet to session */
memcpy(rses->rses_mysql_session->db,
packet + 5,
plen);
memset(rses->rses_mysql_session->db + plen, 0, 1);
/**
* Update the session's active database only if it's in the hashtable.
* If it isn't found, send a custom error packet to the client.
*/
if(hashtable_fetch(
rses->dbhash,
(char*) rses->rses_mysql_session->db) == NULL)
{
/** Create error message */
message_len = 25 + MYSQL_DATABASE_MAXLEN;
fail_str = calloc(1, message_len + 1);
snprintf(fail_str,
message_len,
"Unknown database '%s'",
(char*) rses->rses_mysql_session->db);
rses->rses_mysql_session->db[0] = '\0';
succp = false;
goto reply_error;
}
else
{
succp = true;
goto retblock;
}
}
else
{
/** Create error message */
message_len = 25 + MYSQL_DATABASE_MAXLEN;
fail_str = calloc(1, message_len + 1);
snprintf(fail_str,
message_len,
"Unknown database '%s'",
(char*) rses->rses_mysql_session->db);
succp = false;
goto reply_error;
}
reply_error:
{
GWBUF* errbuf;
skygw_log_write_flush(
LOGFILE_TRACE,
"shardrouter: failed to change database: %s", fail_str);
errbuf = modutil_create_mysql_err_msg(1, 0, 1049, "42000", fail_str);
errbuf = modutil_create_mysql_err_msg(1, 0, 1049, "42000", fail_str);
free(fail_str);
if(errbuf == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Creating buffer for error message failed.")));
goto retblock;
}
/** Set flags that help router to identify session commans reply */
gwbuf_set_type(errbuf, GWBUF_TYPE_MYSQL);
gwbuf_set_type(errbuf, GWBUF_TYPE_SESCMD_RESPONSE);
gwbuf_set_type(errbuf, GWBUF_TYPE_RESPONSE_END);
/**
* Create an incoming event for randomly selected backend DCB which
* will then be notified and replied 'back' to the client.
*/
poll_add_epollin_event_to_dcb(rses->replydcb,
gwbuf_clone(errbuf));
gwbuf_free(errbuf);
}
retblock:
return succp;
}

View File

@ -0,0 +1,78 @@
/*
* This file is distributed as part of the MariaDB Corporation MaxScale. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright MariaDB Corporation Ab 2013-2014
*/
#include <shardrouter.h>
void
subsvc_set_state(SUBSERVICE* svc,subsvc_state_t state)
{
if(state & SUBSVC_WAITING_RESULT)
{
/** Increase waiter count */
atomic_add(&svc->n_res_waiting, 1);
}
svc->state |= state;
}
void
subsvc_clear_state(SUBSERVICE* svc,subsvc_state_t state)
{
if(state & SUBSVC_WAITING_RESULT)
{
/** Decrease waiter count */
atomic_add(&svc->n_res_waiting, -1);
}
svc->state &= ~state;
}
bool
get_shard_subsvc(SUBSERVICE** subsvc,ROUTER_CLIENT_SES* session,char* target)
{
int i;
if(subsvc == NULL || session == NULL || target == NULL)
return false;
for(i = 0;i<session->n_subservice;i++)
{
if(strcmp(session->subservice[i]->service->name,target) == 0)
{
if (SUBSVC_IS_OK(session->subservice[i]))
{
if(subsvc_is_valid(session->subservice[i])){
*subsvc = session->subservice[i];
return true;
}
/**
* The service has failed
*/
subsvc_set_state(session->subservice[i],SUBSVC_FAILED);
}
}
}
return false;
}