Aadded some of the code from routeQuery to its own function, change_current_db, which extracts db name, tests if the name can be found among cached database names (hashtable) and if it does, change the db. Otherwise create an error message and add it to poll eventqueue like it was sent from some of the backends.
This commit is contained in:
@ -250,7 +250,6 @@ static void refreshInstance(
|
|||||||
static void bref_clear_state(backend_ref_t* bref, bref_state_t state);
|
static void bref_clear_state(backend_ref_t* bref, bref_state_t state);
|
||||||
static void bref_set_state(backend_ref_t* bref, bref_state_t state);
|
static void bref_set_state(backend_ref_t* bref, bref_state_t state);
|
||||||
static sescmd_cursor_t* backend_ref_get_sescmd_cursor (backend_ref_t* bref);
|
static sescmd_cursor_t* backend_ref_get_sescmd_cursor (backend_ref_t* bref);
|
||||||
|
|
||||||
static int router_handle_state_switch(DCB* dcb, DCB_REASON reason, void* data);
|
static int router_handle_state_switch(DCB* dcb, DCB_REASON reason, void* data);
|
||||||
static bool handle_error_new_connection(
|
static bool handle_error_new_connection(
|
||||||
ROUTER_INSTANCE* inst,
|
ROUTER_INSTANCE* inst,
|
||||||
@ -263,12 +262,6 @@ static void handle_error_reply_client(
|
|||||||
DCB* backend_dcb,
|
DCB* backend_dcb,
|
||||||
GWBUF* errmsg);
|
GWBUF* errmsg);
|
||||||
|
|
||||||
static backend_ref_t* get_root_master_bref(ROUTER_CLIENT_SES* rses);
|
|
||||||
|
|
||||||
static BACKEND* get_root_master(
|
|
||||||
backend_ref_t* servers,
|
|
||||||
int router_nservers);
|
|
||||||
|
|
||||||
|
|
||||||
static SPINLOCK instlock;
|
static SPINLOCK instlock;
|
||||||
static ROUTER_INSTANCE* instances;
|
static ROUTER_INSTANCE* instances;
|
||||||
@ -276,8 +269,13 @@ static ROUTER_INSTANCE* instances;
|
|||||||
static int hashkeyfun(void* key);
|
static int hashkeyfun(void* key);
|
||||||
static int hashcmpfun (void *, void *);
|
static int hashcmpfun (void *, void *);
|
||||||
|
|
||||||
static int hashkeyfun(
|
static bool change_current_db(
|
||||||
void* key)
|
ROUTER_INSTANCE* inst,
|
||||||
|
ROUTER_CLIENT_SES* rses,
|
||||||
|
GWBUF* buf);
|
||||||
|
|
||||||
|
|
||||||
|
static int hashkeyfun(void* key)
|
||||||
{
|
{
|
||||||
if(key == NULL){
|
if(key == NULL){
|
||||||
return 0;
|
return 0;
|
||||||
@ -341,7 +339,7 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
|
|||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
"Error: Failed to initialize "
|
"Error: failed to initialize "
|
||||||
"MySQL handle.")));
|
"MySQL handle.")));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -356,7 +354,7 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
|
|||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
"Error: Failed to set MySQL connection options.")));
|
"Error: failed to set MySQL connection options.")));
|
||||||
mysql_close(handle);
|
mysql_close(handle);
|
||||||
rval = false;
|
rval = false;
|
||||||
continue;
|
continue;
|
||||||
@ -369,7 +367,7 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
|
|||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
"Error: No username or password "
|
"Error: no username or password "
|
||||||
"defined for server '%s'.",
|
"defined for server '%s'.",
|
||||||
server->unique_name)));
|
server->unique_name)));
|
||||||
rval = false;
|
rval = false;
|
||||||
@ -390,7 +388,7 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
|
|||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
"Error: Failed to connect to backend "
|
"Error: failed to connect to backend "
|
||||||
"server '%s': %d %s",
|
"server '%s': %d %s",
|
||||||
server->name,
|
server->name,
|
||||||
mysql_errno(handle),
|
mysql_errno(handle),
|
||||||
@ -405,7 +403,7 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
|
|||||||
if((result = mysql_list_dbs(handle,NULL)) == NULL)
|
if((result = mysql_list_dbs(handle,NULL)) == NULL)
|
||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||||
"Error: Failed to retrieve databases from backend "
|
"Error: failed to retrieve databases from backend "
|
||||||
"server '%s': %d %s",
|
"server '%s': %d %s",
|
||||||
server->name,
|
server->name,
|
||||||
mysql_errno(handle),
|
mysql_errno(handle),
|
||||||
@ -488,7 +486,7 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
|
|||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
"Error: Failed to insert values into hashtable.")));
|
"Error: failed to insert values into hashtable.")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
@ -499,13 +497,13 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
|
|||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
"Error: Failed to insert values into hashtable.")));
|
"Error: failed to insert values into hashtable.")));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
"Error : Conflicting "
|
"Error : conflicting "
|
||||||
"databases found. "
|
"databases found. "
|
||||||
"Both \"%s\" and \"%s\" "
|
"Both \"%s\" and \"%s\" "
|
||||||
"have a database \"%s\".",
|
"have a database \"%s\".",
|
||||||
@ -545,7 +543,7 @@ void* dbnames_hash_init(BACKEND** backends)
|
|||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
"Error: Hashtable allocation failed.")));
|
"Error: hashtable allocation failed.")));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
/**Update the new hashtable with the key-value pairs*/
|
/**Update the new hashtable with the key-value pairs*/
|
||||||
@ -841,6 +839,10 @@ createInstance(SERVICE *service, char **options)
|
|||||||
|
|
||||||
if (router->dbnames_hash == NULL)
|
if (router->dbnames_hash == NULL)
|
||||||
{
|
{
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error : reading database names encountered an error. "
|
||||||
|
"Router instance can't be created.")));
|
||||||
goto clean_up;
|
goto clean_up;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
@ -1643,8 +1645,6 @@ static int routeQuery(
|
|||||||
}
|
}
|
||||||
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
|
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
packet = GWBUF_DATA(querybuf);
|
packet = GWBUF_DATA(querybuf);
|
||||||
packet_type = packet[4];
|
packet_type = packet[4];
|
||||||
|
|
||||||
@ -1681,28 +1681,6 @@ static int routeQuery(
|
|||||||
switch(packet_type) {
|
switch(packet_type) {
|
||||||
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
|
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
|
||||||
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
|
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
|
||||||
|
|
||||||
if(GWBUF_LENGTH(querybuf) <= MYSQL_DATABASE_MAXLEN - 5)
|
|
||||||
{
|
|
||||||
|
|
||||||
strncpy(router_cli_ses->rses_mysql_session->db,
|
|
||||||
(char*)(packet + 5),
|
|
||||||
(int)(GWBUF_LENGTH(querybuf) - 5));
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Update the session's active database only if it's in the hashtable.
|
|
||||||
* If it isn't found, send a custom error packet to the client.
|
|
||||||
*/
|
|
||||||
|
|
||||||
if(hashtable_fetch(inst->dbnames_hash,
|
|
||||||
(char*)router_cli_ses->rses_mysql_session->db) == NULL)
|
|
||||||
{
|
|
||||||
router_cli_ses->rses_mysql_session->db[0] = '\0';
|
|
||||||
|
|
||||||
/**TODO: Add error packet*/
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */
|
case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */
|
||||||
case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
|
case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
|
||||||
case MYSQL_COM_PING: /*< 0e all servers are pinged */
|
case MYSQL_COM_PING: /*< 0e all servers are pinged */
|
||||||
@ -1744,6 +1722,18 @@ static int routeQuery(
|
|||||||
break;
|
break;
|
||||||
} /**< switch by packet type */
|
} /**< switch by packet type */
|
||||||
|
|
||||||
|
if (packet_type == MYSQL_COM_INIT_DB)
|
||||||
|
{
|
||||||
|
if (!change_current_db(instance, router_cli_ses, querybuf))
|
||||||
|
{
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error : Changing database failed.")));
|
||||||
|
}
|
||||||
|
ret = 1;
|
||||||
|
goto retblock;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* !!! Temporary tablen tutkiminen voi olla turhaa. Poista tarvittaessa.
|
* !!! Temporary tablen tutkiminen voi olla turhaa. Poista tarvittaessa.
|
||||||
*/
|
*/
|
||||||
@ -1845,7 +1835,6 @@ static int routeQuery(
|
|||||||
/**
|
/**
|
||||||
* Added for simple sharding, using hints for testing.
|
* Added for simple sharding, using hints for testing.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
if((tname = get_shard_target_name(inst,router_cli_ses,querybuf)) != NULL)
|
if((tname = get_shard_target_name(inst,router_cli_ses,querybuf)) != NULL)
|
||||||
{
|
{
|
||||||
route_target = TARGET_NAMED_SERVER;
|
route_target = TARGET_NAMED_SERVER;
|
||||||
@ -4128,3 +4117,97 @@ static void dbshard_process_router_options(
|
|||||||
}
|
}
|
||||||
} /*< for */
|
} /*< for */
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read new database nbame from MYSQL_COM_INIT_DB packet, check that it exists
|
||||||
|
* in the hashtable and copy its name to MYSQL_session.
|
||||||
|
*
|
||||||
|
* @param inst Router instance
|
||||||
|
* @param rses Router client session
|
||||||
|
* @param buf Query buffer
|
||||||
|
*
|
||||||
|
* @return true if new database is set, false if non-existent database was tried
|
||||||
|
* to be set
|
||||||
|
*/
|
||||||
|
static bool change_current_db(
|
||||||
|
ROUTER_INSTANCE* inst,
|
||||||
|
ROUTER_CLIENT_SES* rses,
|
||||||
|
GWBUF* buf)
|
||||||
|
{
|
||||||
|
bool succp;
|
||||||
|
uint8_t* packet;
|
||||||
|
int message_len;
|
||||||
|
char* fail_str;
|
||||||
|
|
||||||
|
if(GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5)
|
||||||
|
{
|
||||||
|
packet = GWBUF_DATA(buf);
|
||||||
|
/** Copy database name from MySQL packet to session */
|
||||||
|
strncpy(rses->rses_mysql_session->db,
|
||||||
|
(char*)(packet + 5),
|
||||||
|
(int)(GWBUF_LENGTH(buf) - 5));
|
||||||
|
/**
|
||||||
|
* Update the session's active database only if it's in the hashtable.
|
||||||
|
* If it isn't found, send a custom error packet to the client.
|
||||||
|
*/
|
||||||
|
if(hashtable_fetch(
|
||||||
|
inst->dbnames_hash,
|
||||||
|
(char*)rses->rses_mysql_session->db) == NULL)
|
||||||
|
{
|
||||||
|
/** Create error message */
|
||||||
|
message_len = 25 + MYSQL_DATABASE_MAXLEN;
|
||||||
|
fail_str = calloc(1, message_len+1);
|
||||||
|
snprintf(fail_str,
|
||||||
|
message_len,
|
||||||
|
"Unknown database '%s'",
|
||||||
|
(char*)rses->rses_mysql_session->db);
|
||||||
|
rses->rses_mysql_session->db[0] = '\0';
|
||||||
|
succp = false;
|
||||||
|
goto reply_error;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
succp = true;
|
||||||
|
goto retblock;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/** Create error message */
|
||||||
|
message_len = 25 + MYSQL_DATABASE_MAXLEN;
|
||||||
|
fail_str = calloc(1, message_len+1);
|
||||||
|
snprintf(fail_str,
|
||||||
|
message_len,
|
||||||
|
"Unknown database '%s'",
|
||||||
|
(char*)rses->rses_mysql_session->db);
|
||||||
|
succp = false;
|
||||||
|
goto reply_error;
|
||||||
|
}
|
||||||
|
reply_error:
|
||||||
|
{
|
||||||
|
GWBUF* errbuf;
|
||||||
|
errbuf = modutil_create_mysql_err_msg(2, 0, 1049, "42000", fail_str);
|
||||||
|
free(fail_str);
|
||||||
|
|
||||||
|
if (errbuf == NULL)
|
||||||
|
{
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error : Creating buffer for error message failed.")));
|
||||||
|
goto retblock;
|
||||||
|
}
|
||||||
|
/** Set flags that help router to identify session commans reply */
|
||||||
|
gwbuf_set_type(errbuf, GWBUF_TYPE_MYSQL);
|
||||||
|
gwbuf_set_type(errbuf, GWBUF_TYPE_SESCMD_RESPONSE);
|
||||||
|
gwbuf_set_type(errbuf, GWBUF_TYPE_RESPONSE_END);
|
||||||
|
/**
|
||||||
|
* Create an incoming event for randomly selected backend DCB which
|
||||||
|
* will then be notified and replied 'back' to the client.
|
||||||
|
*/
|
||||||
|
poll_add_epollin_event_to_dcb(rses->rses_backend_ref->bref_dcb,
|
||||||
|
gwbuf_clone(errbuf));
|
||||||
|
gwbuf_free(errbuf);
|
||||||
|
}
|
||||||
|
retblock:
|
||||||
|
return succp;
|
||||||
|
}
|
@ -265,8 +265,9 @@ typedef enum skygw_chk_t {
|
|||||||
#define STRTARGET(t) (t == TARGET_ALL ? "TARGET_ALL" : \
|
#define STRTARGET(t) (t == TARGET_ALL ? "TARGET_ALL" : \
|
||||||
(t == TARGET_MASTER ? "TARGET_MASTER" : \
|
(t == TARGET_MASTER ? "TARGET_MASTER" : \
|
||||||
(t == TARGET_SLAVE ? "TARGET_SLAVE" : \
|
(t == TARGET_SLAVE ? "TARGET_SLAVE" : \
|
||||||
|
(t == TARGET_NAMED_SERVER ? "TARGET_NAMED_SERVER" : \
|
||||||
(t == TARGET_UNDEFINED ? "TARGET_UNDEFINED" : \
|
(t == TARGET_UNDEFINED ? "TARGET_UNDEFINED" : \
|
||||||
"Unknown target value"))))
|
"Unknown target value")))))
|
||||||
|
|
||||||
#define BREFSRV(b) (b->bref_backend->backend_server)
|
#define BREFSRV(b) (b->bref_backend->backend_server)
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user