Updated schemarouter duplicate DB code based on code review.
This commit is contained in:
@ -46,6 +46,13 @@ typedef enum init_mask
|
|||||||
INIT_FAILED = 0x08
|
INIT_FAILED = 0x08
|
||||||
} init_mask_t;
|
} init_mask_t;
|
||||||
|
|
||||||
|
typedef enum showdb_response
|
||||||
|
{
|
||||||
|
SHOWDB_FULL_RESPONSE,
|
||||||
|
SHOWDB_PARTIAL_RESPONSE,
|
||||||
|
SHOWDB_DUPLICATE_DATABASES,
|
||||||
|
SHOWDB_FATAL_ERROR
|
||||||
|
} showdb_response_t;
|
||||||
/**
|
/**
|
||||||
* The state of the backend server reference
|
* The state of the backend server reference
|
||||||
*/
|
*/
|
||||||
|
@ -38,6 +38,12 @@
|
|||||||
|
|
||||||
#define DEFAULT_REFRESH_INTERVAL 30.0
|
#define DEFAULT_REFRESH_INTERVAL 30.0
|
||||||
|
|
||||||
|
/** Size of the hashtable used to store ignored databases */
|
||||||
|
#define SCHEMAROUTER_HASHSIZE 100
|
||||||
|
|
||||||
|
/** Size of the offset vector used for regex matching */
|
||||||
|
#define SCHEMA_OVEC_SIZE 24
|
||||||
|
|
||||||
MODULE_INFO info = {
|
MODULE_INFO info = {
|
||||||
MODULE_API_ROUTER,
|
MODULE_API_ROUTER,
|
||||||
MODULE_BETA_RELEASE,
|
MODULE_BETA_RELEASE,
|
||||||
@ -315,21 +321,21 @@ char* get_lenenc_str(void* data)
|
|||||||
* @return 1 if a complete response was received, 0 if a partial response was received
|
* @return 1 if a complete response was received, 0 if a partial response was received
|
||||||
* and -1 if a database was found on more than one server.
|
* and -1 if a database was found on more than one server.
|
||||||
*/
|
*/
|
||||||
int parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t* bref, GWBUF** buffer)
|
showdb_response_t parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t* bref, GWBUF** buffer)
|
||||||
{
|
{
|
||||||
unsigned char* ptr;
|
unsigned char* ptr;
|
||||||
char* target = bref->bref_backend->backend_server->unique_name;
|
char* target = bref->bref_backend->backend_server->unique_name;
|
||||||
GWBUF* buf;
|
GWBUF* buf;
|
||||||
bool error = false;
|
bool error = false;
|
||||||
int rval = 0;
|
showdb_response_t rval = SHOWDB_PARTIAL_RESPONSE;
|
||||||
|
|
||||||
if (buffer == NULL || *buffer == NULL)
|
if (buffer == NULL || *buffer == NULL)
|
||||||
return -1;
|
return SHOWDB_FATAL_ERROR;
|
||||||
|
|
||||||
buf = modutil_get_complete_packets(buffer);
|
buf = modutil_get_complete_packets(buffer);
|
||||||
|
|
||||||
if (buf == NULL)
|
if (buf == NULL)
|
||||||
return 0;
|
return SHOWDB_PARTIAL_RESPONSE;
|
||||||
|
|
||||||
ptr = (unsigned char*) buf->start;
|
ptr = (unsigned char*) buf->start;
|
||||||
|
|
||||||
@ -337,7 +343,7 @@ int parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t* bref, GWBUF**
|
|||||||
{
|
{
|
||||||
skygw_log_write(LOGFILE_TRACE, "schemarouter: SHOW DATABASES returned an error.");
|
skygw_log_write(LOGFILE_TRACE, "schemarouter: SHOW DATABASES returned an error.");
|
||||||
gwbuf_free(buf);
|
gwbuf_free(buf);
|
||||||
return -1;
|
return SHOWDB_FATAL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (bref->n_mapping_eof == 0)
|
if (bref->n_mapping_eof == 0)
|
||||||
@ -352,7 +358,7 @@ int parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t* bref, GWBUF**
|
|||||||
{
|
{
|
||||||
skygw_log_write(LOGFILE_TRACE, "schemarouter: Malformed packet for SHOW DATABASES.");
|
skygw_log_write(LOGFILE_TRACE, "schemarouter: Malformed packet for SHOW DATABASES.");
|
||||||
*buffer = gwbuf_append(buf, *buffer);
|
*buffer = gwbuf_append(buf, *buffer);
|
||||||
return false;
|
return SHOWDB_FATAL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_add(&bref->n_mapping_eof, 1);
|
atomic_add(&bref->n_mapping_eof, 1);
|
||||||
@ -360,41 +366,38 @@ int parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t* bref, GWBUF**
|
|||||||
ptr += gw_mysql_get_byte3(ptr) + 4;
|
ptr += gw_mysql_get_byte3(ptr) + 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (bref->n_mapping_eof == 1)
|
while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr))
|
||||||
{
|
{
|
||||||
while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr))
|
int payloadlen = gw_mysql_get_byte3(ptr);
|
||||||
|
int packetlen = payloadlen + 4;
|
||||||
|
char* data = get_lenenc_str(ptr + 4);
|
||||||
|
|
||||||
|
if (data)
|
||||||
{
|
{
|
||||||
int payloadlen = gw_mysql_get_byte3(ptr);
|
if (hashtable_add(rses->dbhash, data, target))
|
||||||
int packetlen = payloadlen + 4;
|
|
||||||
char* data = get_lenenc_str(ptr + 4);
|
|
||||||
|
|
||||||
if (data)
|
|
||||||
{
|
{
|
||||||
if (hashtable_add(rses->dbhash, data, target))
|
skygw_log_write(LOGFILE_TRACE, "schemarouter: <%s, %s>", target, data);
|
||||||
{
|
|
||||||
skygw_log_write(LOGFILE_TRACE, "schemarouter: <%s, %s>", target, data);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
int ovector[24];
|
|
||||||
const int ovec_count = 24;
|
|
||||||
|
|
||||||
if (!(hashtable_fetch(rses->router->ignored_dbs, data) ||
|
|
||||||
(rses->router->ignore_regex &&
|
|
||||||
pcre_exec(rses->router->ignore_regex, NULL, (const char*) data,
|
|
||||||
strlen(data), 0, 0, ovector, ovec_count) >= 0)))
|
|
||||||
{
|
|
||||||
error = true;
|
|
||||||
skygw_log_write(LE, "Error: Database '%s' found on servers '%s' and '%s' for user %s@%s.",
|
|
||||||
data, target, hashtable_fetch(rses->dbhash, data),
|
|
||||||
rses->rses_client_dcb->user,
|
|
||||||
rses->rses_client_dcb->remote);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
free(data);
|
|
||||||
}
|
}
|
||||||
ptr += packetlen;
|
else
|
||||||
|
{
|
||||||
|
const int ovec_count = SCHEMA_OVEC_SIZE;
|
||||||
|
int ovector[ovec_count];
|
||||||
|
|
||||||
|
if (!(hashtable_fetch(rses->router->ignored_dbs, data) ||
|
||||||
|
(rses->router->ignore_regex &&
|
||||||
|
pcre_exec(rses->router->ignore_regex, NULL, data,
|
||||||
|
strlen(data), 0, 0, ovector, ovec_count) >= 0)))
|
||||||
|
{
|
||||||
|
error = true;
|
||||||
|
skygw_log_write(LE, "Error: Database '%s' found on servers '%s' and '%s' for user %s@%s.",
|
||||||
|
data, target, hashtable_fetch(rses->dbhash, data),
|
||||||
|
rses->rses_client_dcb->user,
|
||||||
|
rses->rses_client_dcb->remote);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
free(data);
|
||||||
}
|
}
|
||||||
|
ptr += packetlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ptr < (unsigned char*) buf->end && PTR_IS_EOF(ptr) &&
|
if (ptr < (unsigned char*) buf->end && PTR_IS_EOF(ptr) &&
|
||||||
@ -413,9 +416,9 @@ int parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t* bref, GWBUF**
|
|||||||
gwbuf_free(buf);
|
gwbuf_free(buf);
|
||||||
|
|
||||||
if (error)
|
if (error)
|
||||||
rval = -1;
|
rval = SHOWDB_DUPLICATE_DATABASES;
|
||||||
else if (bref->n_mapping_eof == 2)
|
else if (bref->n_mapping_eof == 2)
|
||||||
rval = 1;
|
rval = SHOWDB_FULL_RESPONSE;
|
||||||
|
|
||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
@ -756,7 +759,7 @@ createInstance(SERVICE *service, char **options)
|
|||||||
if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
|
if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if((router->ignored_dbs = hashtable_alloc(100, hashkeyfun, hashcmpfun)) == NULL)
|
if((router->ignored_dbs = hashtable_alloc(SCHEMAROUTER_HASHSIZE, hashkeyfun, hashcmpfun)) == NULL)
|
||||||
{
|
{
|
||||||
skygw_log_write(LE,"Error: Memory allocation failed when allocating schemarouter database ignore list.");
|
skygw_log_write(LE,"Error: Memory allocation failed when allocating schemarouter database ignore list.");
|
||||||
free(router);
|
free(router);
|
||||||
@ -1117,7 +1120,7 @@ static void* newSession(
|
|||||||
session,
|
session,
|
||||||
router);
|
router);
|
||||||
|
|
||||||
client_rses->dbhash = hashtable_alloc(100, hashkeyfun, hashcmpfun);
|
client_rses->dbhash = hashtable_alloc(SCHEMAROUTER_HASHSIZE, hashkeyfun, hashcmpfun);
|
||||||
hashtable_memory_fns(client_rses->dbhash,(HASHMEMORYFN)strdup,
|
hashtable_memory_fns(client_rses->dbhash,(HASHMEMORYFN)strdup,
|
||||||
(HASHMEMORYFN)strdup,
|
(HASHMEMORYFN)strdup,
|
||||||
(HASHMEMORYFN)free,
|
(HASHMEMORYFN)free,
|
||||||
@ -1669,7 +1672,7 @@ void check_create_tmp_table(
|
|||||||
if(rses_prop_tmp){
|
if(rses_prop_tmp){
|
||||||
if (rses_prop_tmp->rses_prop_data.temp_tables == NULL)
|
if (rses_prop_tmp->rses_prop_data.temp_tables == NULL)
|
||||||
{
|
{
|
||||||
h = hashtable_alloc(100, hashkeyfun, hashcmpfun);
|
h = hashtable_alloc(SCHEMAROUTER_HASHSIZE, hashkeyfun, hashcmpfun);
|
||||||
hashtable_memory_fns(h,(HASHMEMORYFN)strdup,(HASHMEMORYFN)strdup,(HASHMEMORYFN)free,(HASHMEMORYFN)free);
|
hashtable_memory_fns(h,(HASHMEMORYFN)strdup,(HASHMEMORYFN)strdup,(HASHMEMORYFN)free,(HASHMEMORYFN)free);
|
||||||
if (h != NULL)
|
if (h != NULL)
|
||||||
{
|
{
|
||||||
@ -2137,7 +2140,7 @@ static int routeQuery(
|
|||||||
router_cli_ses->rses_config.last_refresh = now;
|
router_cli_ses->rses_config.last_refresh = now;
|
||||||
router_cli_ses->queue = querybuf;
|
router_cli_ses->queue = querybuf;
|
||||||
hashtable_free(router_cli_ses->dbhash);
|
hashtable_free(router_cli_ses->dbhash);
|
||||||
if((router_cli_ses->dbhash = hashtable_alloc(100, hashkeyfun, hashcmpfun)) == NULL)
|
if((router_cli_ses->dbhash = hashtable_alloc(SCHEMAROUTER_HASHSIZE, hashkeyfun, hashcmpfun)) == NULL)
|
||||||
{
|
{
|
||||||
skygw_log_write(LE,"Error: Hashtable allocation failed.");
|
skygw_log_write(LE,"Error: Hashtable allocation failed.");
|
||||||
rses_end_locked_router_action(router_cli_ses);
|
rses_end_locked_router_action(router_cli_ses);
|
||||||
@ -4594,17 +4597,17 @@ int inspect_backend_mapping_states(ROUTER_CLIENT_SES *router_cli_ses,
|
|||||||
writebuf = gwbuf_append(bref->map_queue, writebuf);
|
writebuf = gwbuf_append(bref->map_queue, writebuf);
|
||||||
bref->map_queue = NULL;
|
bref->map_queue = NULL;
|
||||||
}
|
}
|
||||||
int rc = parse_showdb_response(router_cli_ses,
|
showdb_response_t rc = parse_showdb_response(router_cli_ses,
|
||||||
&router_cli_ses->rses_backend_ref[i],
|
&router_cli_ses->rses_backend_ref[i],
|
||||||
&writebuf);
|
&writebuf);
|
||||||
if (rc == 1)
|
if (rc == SHOWDB_FULL_RESPONSE)
|
||||||
{
|
{
|
||||||
router_cli_ses->rses_backend_ref[i].bref_mapped = true;
|
router_cli_ses->rses_backend_ref[i].bref_mapped = true;
|
||||||
skygw_log_write(LOGFILE_DEBUG, "schemarouter: Received SHOW DATABASES reply from %s for session %p",
|
skygw_log_write(LOGFILE_DEBUG, "schemarouter: Received SHOW DATABASES reply from %s for session %p",
|
||||||
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
|
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
|
||||||
router_cli_ses->rses_client_dcb->session);
|
router_cli_ses->rses_client_dcb->session);
|
||||||
}
|
}
|
||||||
else if (rc == 0)
|
else if (rc == SHOWDB_PARTIAL_RESPONSE)
|
||||||
{
|
{
|
||||||
bref->map_queue = writebuf;
|
bref->map_queue = writebuf;
|
||||||
writebuf = NULL;
|
writebuf = NULL;
|
||||||
@ -4618,7 +4621,14 @@ int inspect_backend_mapping_states(ROUTER_CLIENT_SES *router_cli_ses,
|
|||||||
|
|
||||||
if ((router_cli_ses->init & INIT_FAILED) == 0)
|
if ((router_cli_ses->init & INIT_FAILED) == 0)
|
||||||
{
|
{
|
||||||
skygw_log_write(LE, "Error: Duplicate databases found, closing session.");
|
if(rc == SHOWDB_DUPLICATE_DATABASES)
|
||||||
|
{
|
||||||
|
skygw_log_write(LE, "Error: Duplicate databases found, closing session.");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
skygw_log_write(LE, "Error: Fatal error when processing SHOW DATABASES response, closing session.");
|
||||||
|
}
|
||||||
client_dcb = router_cli_ses->rses_client_dcb;
|
client_dcb = router_cli_ses->rses_client_dcb;
|
||||||
|
|
||||||
/** This is the first response to the database mapping which
|
/** This is the first response to the database mapping which
|
||||||
|
Reference in New Issue
Block a user