From 463cd4a97e2342ffd9c5e3895cd4ebdb38b0f095 Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Mon, 4 May 2015 07:04:31 +0300 Subject: [PATCH] Moved common sharding functions to a separate file. --- server/modules/include/sharding_common.h | 18 ++ server/modules/routing/CMakeLists.txt | 10 +- .../routing/schemarouter/CMakeLists.txt | 11 + .../routing/schemarouter/schemarouter.c | 228 +++--------------- .../routing/schemarouter/sharding_common.c | 161 +++++++++++++ .../routing/schemarouter/shardrouter.c | 212 ++-------------- server/modules/routing/schemarouter/svcconn.c | 78 ++++++ 7 files changed, 320 insertions(+), 398 deletions(-) create mode 100644 server/modules/include/sharding_common.h create mode 100644 server/modules/routing/schemarouter/CMakeLists.txt create mode 100644 server/modules/routing/schemarouter/sharding_common.c create mode 100644 server/modules/routing/schemarouter/svcconn.c diff --git a/server/modules/include/sharding_common.h b/server/modules/include/sharding_common.h new file mode 100644 index 000000000..a401d2e31 --- /dev/null +++ b/server/modules/include/sharding_common.h @@ -0,0 +1,18 @@ +#ifndef _SHARDING_COMMON_HG +#define _SHARDING_COMMON_HG + +#include +#include +#include +#include +#include +#include +#include + +bool extract_database(GWBUF* buf, char* str); +void create_error_reply(char* fail_str,DCB* dcb); +bool change_current_db(MYSQL_session* mysql_session, + HASHTABLE* dbhash, + GWBUF* buf); + +#endif diff --git a/server/modules/routing/CMakeLists.txt b/server/modules/routing/CMakeLists.txt index 77abe6cd1..02535b7b9 100644 --- a/server/modules/routing/CMakeLists.txt +++ b/server/modules/routing/CMakeLists.txt @@ -5,14 +5,6 @@ if(BUILD_TESTS) install(TARGETS testroute DESTINATION modules) endif() -add_library(schemarouter SHARED schemarouter/schemarouter.c) -target_link_libraries(schemarouter log_manager utils query_classifier) -install(TARGETS schemarouter DESTINATION modules) - -add_library(shardrouter SHARED schemarouter/shardrouter.c) -target_link_libraries(shardrouter log_manager utils query_classifier) -install(TARGETS shardrouter DESTINATION modules) - add_library(readconnroute SHARED readconnroute.c) target_link_libraries(readconnroute log_manager utils) install(TARGETS readconnroute DESTINATION modules) @@ -26,7 +18,7 @@ target_link_libraries(cli log_manager utils) install(TARGETS cli DESTINATION modules) add_subdirectory(readwritesplit) -add_subdirectory(schemarouter/test) +add_subdirectory(schemarouter) if(BUILD_BINLOG) add_subdirectory(binlog) endif() diff --git a/server/modules/routing/schemarouter/CMakeLists.txt b/server/modules/routing/schemarouter/CMakeLists.txt new file mode 100644 index 000000000..70e6348af --- /dev/null +++ b/server/modules/routing/schemarouter/CMakeLists.txt @@ -0,0 +1,11 @@ +add_library(schemarouter SHARED schemarouter.c sharding_common.c) +target_link_libraries(schemarouter log_manager utils query_classifier) +install(TARGETS schemarouter DESTINATION modules) + +add_library(shardrouter SHARED shardrouter.c svcconn.c sharding_common.c) +target_link_libraries(shardrouter log_manager utils query_classifier) +install(TARGETS shardrouter DESTINATION modules) + +if(BUILD_TESTS) + add_subdirectory(test) +endif() diff --git a/server/modules/routing/schemarouter/schemarouter.c b/server/modules/routing/schemarouter/schemarouter.c index 2dfe7ddfb..78a6b5d4a 100644 --- a/server/modules/routing/schemarouter/schemarouter.c +++ b/server/modules/routing/schemarouter/schemarouter.c @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -208,12 +209,6 @@ static ROUTER_INSTANCE* instances; static int hashkeyfun(void* key); static int hashcmpfun (void *, void *); -static bool change_current_db( - ROUTER_INSTANCE* inst, - ROUTER_CLIENT_SES* rses, - GWBUF* buf); - - static int hashkeyfun(void* key) { if(key == NULL){ @@ -250,7 +245,7 @@ char* get_lenenc_str(void* data, int* len) { unsigned char* ptr = (unsigned char*)data; char* rval; - long size, offset; + unsigned long size, offset; if(data == NULL || len == NULL) { @@ -279,9 +274,14 @@ char* get_lenenc_str(void* data, int* len) offset = 3; break; case 0xfe: - size = *ptr + ((*(ptr + 2) << 8)) + (*(ptr + 3) << 16) + - (*(ptr + 4) << 24) + (*(ptr + 5) << 32) + (*(ptr + 6) << 40) + - (*(ptr + 7) << 48) + (*(ptr + 8) << 56); + size = *ptr + + ((*(ptr + 2) << 8)) + + (*(ptr + 3) << 16) + + (*(ptr + 4) << 24) + + ((unsigned long)*(ptr + 5) << 32) + + ((unsigned long)*(ptr + 6) << 40) + + ((unsigned long)*(ptr + 7) << 48) + + ((unsigned long)*(ptr + 8) << 56); offset = 8; break; default: @@ -1837,7 +1837,7 @@ static int routeQuery( skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; mysql_server_cmd_t packet_type; uint8_t* packet; - int ret = 0; + int i,ret = 0; DCB* target_dcb = NULL; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; @@ -1847,7 +1847,8 @@ static int routeQuery( bool succp = false; char* tname = NULL; GWBUF* querybuf = qbuf; - + char db[MYSQL_DATABASE_MAXLEN + 1]; + char errbuf[26+MYSQL_DATABASE_MAXLEN]; CHK_CLIENT_RSES(router_cli_ses); /** Dirty read for quick check if router is closed. */ @@ -2020,12 +2021,25 @@ static int routeQuery( if (packet_type == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB) { - if (!(change_successful = change_current_db(inst, router_cli_ses, querybuf))) + if (!(change_successful = change_current_db(router_cli_ses->rses_mysql_session, + router_cli_ses->dbhash, + querybuf))) + { + extract_database(querybuf,db); + snprintf(errbuf,25+MYSQL_DATABASE_MAXLEN,"Unknown database: %s",db); + for(i = 0;irses_nbackends;i++) { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Changing database failed."))); + if(SERVER_IS_RUNNING(router_cli_ses->rses_backend_ref[i].bref_backend->backend_server)) + { + create_error_reply(errbuf,router_cli_ses->rses_backend_ref[i].bref_dcb); + break; + } } + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Changing database failed."))); + } } if(QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_DATABASES)) @@ -2755,8 +2769,7 @@ static void clientReply ( /** There is one pending session command to be executed. */ if (sescmd_cursor_is_active(scur)) { - bool succp; - + LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "Backend %s:%d processed reply and starts to execute " @@ -2764,9 +2777,7 @@ static void clientReply ( bref->bref_backend->backend_server->name, bref->bref_backend->backend_server->port))); - succp = execute_sescmd_in_backend(bref); - - ss_dassert(succp); + execute_sescmd_in_backend(bref); } else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */ { @@ -4320,8 +4331,6 @@ router_handle_state_switch( { backend_ref_t* bref; int rc = 1; - ROUTER_CLIENT_SES* rses; - SESSION* ses; SERVER* srv; CHK_DCB(dcb); @@ -4334,12 +4343,7 @@ router_handle_state_switch( { goto return_rc; } - ses = dcb->session; - CHK_SESSION(ses); - rses = (ROUTER_CLIENT_SES *) dcb->session->router_session; - CHK_CLIENT_RSES(rses); - switch(reason) { case DCB_REASON_NOT_RESPONDING: @@ -4369,172 +4373,4 @@ static sescmd_cursor_t* backend_ref_get_sescmd_cursor ( return scur; } -/** - * Read new database name from MYSQL_COM_INIT_DB packet, check that it exists - * in the hashtable and copy its name to MYSQL_session. - * - * @param inst Router instance - * @param rses Router client session - * @param buf Query buffer - * - * @return true if new database is set, false if non-existent database was tried - * to be set - */ -static bool change_current_db( - ROUTER_INSTANCE* inst, - ROUTER_CLIENT_SES* rses, - GWBUF* buf) -{ - bool succp; - uint8_t* packet; - unsigned int plen; - int message_len; - char* fail_str,*target; - - if(GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5) - { - packet = GWBUF_DATA(buf); - plen = gw_mysql_get_byte3(packet) - 1; - /** Copy database name from MySQL packet to session */ - if(query_classifier_get_operation(buf) == QUERY_OP_CHANGE_DB) - { - char* query = modutil_get_SQL(buf); - char *saved,*tok; - - tok = strtok_r(query," ;",&saved); - if(tok == NULL || strcasecmp(tok,"use") != 0) - { - skygw_log_write(LOGFILE_ERROR,"Schemarouter: Malformed chage database packet."); - free(query); - message_len = 25 + MYSQL_DATABASE_MAXLEN; - fail_str = calloc(1, message_len+1); - snprintf(fail_str, - message_len, - "Unknown database '%s'", - (char*)rses->rses_mysql_session->db); - rses->rses_mysql_session->db[0] = '\0'; - succp = false; - goto reply_error; - } - - tok = strtok_r(NULL," ;",&saved); - if(tok == NULL) - { - skygw_log_write(LOGFILE_ERROR,"Schemarouter: Malformed chage database packet."); - free(query); - message_len = 25 + MYSQL_DATABASE_MAXLEN; - fail_str = calloc(1, message_len+1); - snprintf(fail_str, - message_len, - "Unknown database '%s'", - (char*)rses->rses_mysql_session->db); - rses->rses_mysql_session->db[0] = '\0'; - succp = false; - goto reply_error; - } - - strncpy(rses->rses_mysql_session->db,tok,MYSQL_DATABASE_MAXLEN); - free(query); - query = NULL; - - } - else - { - memcpy(rses->rses_mysql_session->db,packet + 5,plen); - memset(rses->rses_mysql_session->db + plen,0,1); - } - skygw_log_write(LOGFILE_TRACE,"schemarouter: INIT_DB with database '%s'", - rses->rses_mysql_session->db); - /** - * Update the session's active database only if it's in the hashtable. - * If it isn't found, send a custom error packet to the client. - */ - - - if((target = (char*)hashtable_fetch( - rses->dbhash, - (char*)rses->rses_mysql_session->db)) == NULL) - { - - /** Create error message */ - message_len = 25 + MYSQL_DATABASE_MAXLEN; - fail_str = calloc(1, message_len+1); - snprintf(fail_str, - message_len, - "Unknown database '%s'", - (char*)rses->rses_mysql_session->db); - rses->rses_mysql_session->db[0] = '\0'; - succp = false; - goto reply_error; - } - else - { - skygw_log_write(LOGFILE_TRACE,"schemarouter: database is on server: '%s'.",target); - succp = true; - goto retblock; - } - } - else - { - /** Create error message */ - skygw_log_write_flush(LOGFILE_ERROR, - "schemarouter: failed to change database: Query buffer too large"); - skygw_log_write_flush(LOGFILE_TRACE, - "schemarouter: failed to change database: Query buffer too large [%d bytes]",GWBUF_LENGTH(buf)); - message_len = 25 + MYSQL_DATABASE_MAXLEN; - fail_str = calloc(1, message_len+1); - snprintf(fail_str, - message_len, - "Unknown database '%s'", - (char*)rses->rses_mysql_session->db); - succp = false; - goto reply_error; - } -reply_error: - { - GWBUF* errbuf; - skygw_log_write_flush( - LOGFILE_TRACE, - "schemarouter: failed to change database: %s", fail_str); - errbuf = modutil_create_mysql_err_msg(1, 0, 1049, "42000", fail_str); - free(fail_str); - - - if (errbuf == NULL) - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Creating buffer for error message failed."))); - goto retblock; - } - /** Set flags that help router to identify session commans reply */ - gwbuf_set_type(errbuf, GWBUF_TYPE_MYSQL); - gwbuf_set_type(errbuf, GWBUF_TYPE_SESCMD_RESPONSE); - gwbuf_set_type(errbuf, GWBUF_TYPE_RESPONSE_END); - /** - * Create an incoming event for randomly selected backend DCB which - * will then be notified and replied 'back' to the client. - */ - DCB *dcb = NULL; - int i; - for(i = 0;irses_nbackends;i++) - { - if(rses->rses_backend_ref[i].bref_dcb){ - dcb = rses->rses_backend_ref[i].bref_dcb; - break; - } - } - - if(dcb == NULL) - { - skygw_log_write_flush(LOGFILE_ERROR,"Error : All backend connections are down."); - return false; - } - poll_add_epollin_event_to_dcb(rses->dcb_reply, - gwbuf_clone(errbuf)); - gwbuf_free(errbuf); - } -retblock: - return succp; -} diff --git a/server/modules/routing/schemarouter/sharding_common.c b/server/modules/routing/schemarouter/sharding_common.c new file mode 100644 index 000000000..d69b12d02 --- /dev/null +++ b/server/modules/routing/schemarouter/sharding_common.c @@ -0,0 +1,161 @@ +/* + * This file is distributed as part of the MariaDB Corporation MaxScale. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright MariaDB Corporation Ab 2013-2015 + */ + +#include + +/** Defined in log_manager.cc */ +extern int lm_enabled_logfiles_bitmask; +extern size_t log_ses_count[]; +extern __thread log_info_t tls_log_info; + +/** + * Extract the database name from a COM_INIT_DB or literal USE ... query. + * @param buf Buffer with the database change query + * @param str Pointer where the database name is copied + * @return True for success, false for failure + */ +bool extract_database(GWBUF* buf, char* str) +{ + uint8_t* packet; + char *saved,*tok,*query = NULL; + bool succp = true; + unsigned int plen; + + packet = GWBUF_DATA(buf); + plen = gw_mysql_get_byte3(packet) - 1; + + /** Copy database name from MySQL packet to session */ + if(query_classifier_get_operation(buf) == QUERY_OP_CHANGE_DB) + { + query = modutil_get_SQL(buf); + tok = strtok_r(query," ;",&saved); + if(tok == NULL || strcasecmp(tok,"use") != 0) + { + skygw_log_write(LOGFILE_ERROR,"Schemarouter: Malformed chage database packet."); + succp = false; + goto retblock; + } + + tok = strtok_r(NULL," ;",&saved); + if(tok == NULL) + { + skygw_log_write(LOGFILE_ERROR,"Schemarouter: Malformed chage database packet."); + succp = false; + goto retblock; + } + + strncpy(str,tok,MYSQL_DATABASE_MAXLEN); + } + else + { + memcpy(str,packet + 5,plen); + memset(str + plen,0,1); + } + retblock: + free(query); + return succp; +} + +/** + * Create a fake error message from a DCB. + * @param fail_str Custom error message + * @param dcb DCB to use as the origin of the error + */ +void create_error_reply(char* fail_str,DCB* dcb) +{ + skygw_log_write_flush( + LOGFILE_TRACE, + "change_current_db: failed to change database: %s", fail_str); + GWBUF* errbuf = modutil_create_mysql_err_msg(1, 0, 1049, "42000", fail_str); + + if (errbuf == NULL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Creating buffer for error message failed."))); + return; + } + /** Set flags that help router to identify session commands reply */ + gwbuf_set_type(errbuf, GWBUF_TYPE_MYSQL); + gwbuf_set_type(errbuf, GWBUF_TYPE_SESCMD_RESPONSE); + gwbuf_set_type(errbuf, GWBUF_TYPE_RESPONSE_END); + + poll_add_epollin_event_to_dcb(dcb, + errbuf); +} + +/** + * Read new database name from MYSQL_COM_INIT_DB packet or a literal USE ... COM_QUERY packet, check that it exists + * in the hashtable and copy its name to MYSQL_session. + * + * @param mysql_session The MySQL session structure + * @param dbhash Hashtable containing valid databases + * @param buf Buffer containing the database change query + * + * @return true if new database is set, false if non-existent database was tried + * to be set + */ +bool change_current_db(MYSQL_session* mysql_session, + HASHTABLE* dbhash, + GWBUF* buf) +{ + char* target; + bool succp; + char db[MYSQL_DATABASE_MAXLEN+1]; + if(GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5) + { + /** Copy database name from MySQL packet to session */ + if(!extract_database(buf,db)) + { + succp = false; + goto retblock; + } + skygw_log_write(LOGFILE_TRACE,"change_current_db: INIT_DB with database '%s'", + db); + /** + * Update the session's active database only if it's in the hashtable. + * If it isn't found, send a custom error packet to the client. + */ + + if((target = (char*)hashtable_fetch(dbhash,(char*)db)) == NULL) + { + succp = false; + goto retblock; + } + else + { + strncpy(mysql_session->db,db,MYSQL_DATABASE_MAXLEN); + skygw_log_write(LOGFILE_TRACE,"change_current_db: database is on server: '%s'.",target); + succp = true; + goto retblock; + } + } + else + { + /** Create error message */ + skygw_log_write_flush(LOGFILE_ERROR, + "change_current_db: failed to change database: Query buffer too large"); + skygw_log_write_flush(LOGFILE_TRACE, + "change_current_db: failed to change database: Query buffer too large [%d bytes]",GWBUF_LENGTH(buf)); + succp = false; + goto retblock; + } + + retblock: + return succp; +} diff --git a/server/modules/routing/schemarouter/shardrouter.c b/server/modules/routing/schemarouter/shardrouter.c index a3c9da3c4..0d0aa6c84 100644 --- a/server/modules/routing/schemarouter/shardrouter.c +++ b/server/modules/routing/schemarouter/shardrouter.c @@ -13,8 +13,9 @@ * this program; if not, write to the Free Software Foundation, Inc., 51 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. * - * Copyright MariaDB Corporation Ab 2013-2014 + * Copyright MariaDB Corporation Ab 2013-2015 */ + #include #include #include @@ -24,6 +25,7 @@ #include #include +#include #include #include #include @@ -119,9 +121,9 @@ static route_target_t get_shard_route_target( static uint8_t getCapabilities(ROUTER* inst, void* router_session); -static void subsvc_clear_state(SUBSERVICE* svc,subsvc_state_t state); -static void subsvc_set_state(SUBSERVICE* svc,subsvc_state_t state); -static bool get_shard_subsvc(SUBSERVICE** subsvc,ROUTER_CLIENT_SES* session,char* target); +void subsvc_clear_state(SUBSERVICE* svc,subsvc_state_t state); +void subsvc_set_state(SUBSERVICE* svc,subsvc_state_t state); +bool get_shard_subsvc(SUBSERVICE** subsvc,ROUTER_CLIENT_SES* session,char* target); static ROUTER_OBJECT MyObject = { createInstance, @@ -205,19 +207,6 @@ static void refreshInstance( CONFIG_PARAMETER* param); static int router_handle_state_switch(DCB* dcb, DCB_REASON reason, void* data); -/* -static bool handle_error_new_connection( - ROUTER_INSTANCE* inst, - ROUTER_CLIENT_SES* rses, - DCB* backend_dcb, - GWBUF* errmsg); -static void handle_error_reply_client( - SESSION* ses, - ROUTER_CLIENT_SES* rses, - DCB* backend_dcb, - GWBUF* errmsg); -*/ - static SPINLOCK instlock; static ROUTER_INSTANCE* instances; @@ -225,11 +214,6 @@ static ROUTER_INSTANCE* instances; static int hashkeyfun(void* key); static int hashcmpfun(void *, void *); -static bool change_current_db( - ROUTER_INSTANCE* inst, - ROUTER_CLIENT_SES* rses, - GWBUF* buf); - static int hashkeyfun(void* key) { @@ -1191,7 +1175,7 @@ newSession( atomic_add(&client_rses->rses_versno, 2); ss_dassert(client_rses->rses_versno == 2); - client_rses->dbhash = hashtable_alloc(100, hashkeyfun, hashcmpfun); + client_rses->dbhash = hashtable_alloc(100, simple_str_hash,strcmp); hashtable_memory_fns(client_rses->dbhash, (HASHMEMORYFN) strdup, (HASHMEMORYFN) strdup, (HASHMEMORYFN) free, @@ -1564,7 +1548,7 @@ routeQuery(ROUTER* instance, skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; mysql_server_cmd_t packet_type; uint8_t* packet; - int ret = 1; + int i,ret = 1; SUBSERVICE* target_subsvc; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *) instance; ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *) router_session; @@ -1573,7 +1557,10 @@ routeQuery(ROUTER* instance, route_target_t route_target = TARGET_UNDEFINED; bool succp = false; char* tname = NULL; - skygw_log_write_flush(LOGFILE_TRACE,"shardrouter: routeQuery"); + char db[MYSQL_DATABASE_MAXLEN + 1]; + char errbuf[26+MYSQL_DATABASE_MAXLEN]; + + skygw_log_write_flush(LOGFILE_DEBUG,"shardrouter: routeQuery"); CHK_CLIENT_RSES(router_cli_ses); /** Dirty read for quick check if router is closed. */ @@ -1714,8 +1701,13 @@ routeQuery(ROUTER* instance, if(packet_type == MYSQL_COM_INIT_DB) { - if(!(change_successful = change_current_db(inst, router_cli_ses, querybuf))) + if(!(change_successful = change_current_db(router_cli_ses->rses_mysql_session, + router_cli_ses->dbhash, + querybuf))) { + extract_database(querybuf,db); + snprintf(errbuf,"Unknown database: %s",db); + create_error_reply(errbuf,router_cli_ses->replydcb); LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Changing database failed."))); @@ -2066,34 +2058,6 @@ clientReply( return; } - -static void -subsvc_set_state(SUBSERVICE* svc,subsvc_state_t state) -{ - if(state & SUBSVC_WAITING_RESULT) - { - - /** Increase waiter count */ - atomic_add(&svc->n_res_waiting, 1); - } - - svc->state |= state; -} - -static void -subsvc_clear_state(SUBSERVICE* svc,subsvc_state_t state) -{ - - - if(state & SUBSVC_WAITING_RESULT) - { - /** Decrease waiter count */ - atomic_add(&svc->n_res_waiting, -1); - } - - svc->state &= ~state; -} - /** * Create a generic router session property strcture. */ @@ -2634,8 +2598,7 @@ mysql_sescmd_get_property( * capabilities specified, rc > 0 when there are capabilities. */ static uint8_t -getCapabilities( - ROUTER* inst, +getCapabilities(ROUTER* inst, void* router_session) { ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *) router_session; @@ -2905,37 +2868,7 @@ handleError( } -static bool -get_shard_subsvc(SUBSERVICE** subsvc,ROUTER_CLIENT_SES* session,char* target) -{ - int i; - - if(subsvc == NULL || session == NULL || target == NULL) - return false; - for(i = 0;in_subservice;i++) - { - if(strcmp(session->subservice[i]->service->name,target) == 0) - { - - if (SUBSVC_IS_OK(session->subservice[i])) - { - if(subsvc_is_valid(session->subservice[i])){ - *subsvc = session->subservice[i]; - return true; - } - - /** - * The service has failed - */ - - subsvc_set_state(session->subservice[i],SUBSVC_FAILED); - } - } - } - - return false; -} /** * Finds the subservice who owns this session. * @param rses Router client session @@ -3003,110 +2936,3 @@ return_rc: return rc; #endif } - -/** - * Read new database nbame from MYSQL_COM_INIT_DB packet, check that it exists - * in the hashtable and copy its name to MYSQL_session. - * - * @param inst Router instance - * @param rses Router client session - * @param buf Query buffer - * - * @return true if new database is set, false if non-existent database was tried - * to be set - */ -static bool -change_current_db( - ROUTER_INSTANCE* inst, - ROUTER_CLIENT_SES* rses, - GWBUF* buf) -{ - bool succp; - uint8_t* packet; - unsigned int plen; - int message_len; - char* fail_str; - - if(GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5) - { - packet = GWBUF_DATA(buf); - plen = gw_mysql_get_byte3(packet) - 1; - - /** Copy database name from MySQL packet to session */ - - memcpy(rses->rses_mysql_session->db, - packet + 5, - plen); - memset(rses->rses_mysql_session->db + plen, 0, 1); - - /** - * Update the session's active database only if it's in the hashtable. - * If it isn't found, send a custom error packet to the client. - */ - - if(hashtable_fetch( - rses->dbhash, - (char*) rses->rses_mysql_session->db) == NULL) - { - - /** Create error message */ - message_len = 25 + MYSQL_DATABASE_MAXLEN; - fail_str = calloc(1, message_len + 1); - snprintf(fail_str, - message_len, - "Unknown database '%s'", - (char*) rses->rses_mysql_session->db); - rses->rses_mysql_session->db[0] = '\0'; - succp = false; - goto reply_error; - } - else - { - succp = true; - goto retblock; - } - } - else - { - /** Create error message */ - message_len = 25 + MYSQL_DATABASE_MAXLEN; - fail_str = calloc(1, message_len + 1); - snprintf(fail_str, - message_len, - "Unknown database '%s'", - (char*) rses->rses_mysql_session->db); - succp = false; - goto reply_error; - } -reply_error: - { - GWBUF* errbuf; - skygw_log_write_flush( - LOGFILE_TRACE, - "shardrouter: failed to change database: %s", fail_str); - errbuf = modutil_create_mysql_err_msg(1, 0, 1049, "42000", fail_str); - errbuf = modutil_create_mysql_err_msg(1, 0, 1049, "42000", fail_str); - free(fail_str); - - if(errbuf == NULL) - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Creating buffer for error message failed."))); - goto retblock; - } - /** Set flags that help router to identify session commans reply */ - gwbuf_set_type(errbuf, GWBUF_TYPE_MYSQL); - gwbuf_set_type(errbuf, GWBUF_TYPE_SESCMD_RESPONSE); - gwbuf_set_type(errbuf, GWBUF_TYPE_RESPONSE_END); - /** - * Create an incoming event for randomly selected backend DCB which - * will then be notified and replied 'back' to the client. - */ - poll_add_epollin_event_to_dcb(rses->replydcb, - gwbuf_clone(errbuf)); - gwbuf_free(errbuf); - } -retblock: - return succp; -} diff --git a/server/modules/routing/schemarouter/svcconn.c b/server/modules/routing/schemarouter/svcconn.c new file mode 100644 index 000000000..2d8b07a2a --- /dev/null +++ b/server/modules/routing/schemarouter/svcconn.c @@ -0,0 +1,78 @@ +/* + * This file is distributed as part of the MariaDB Corporation MaxScale. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright MariaDB Corporation Ab 2013-2014 + */ + +#include + +void +subsvc_set_state(SUBSERVICE* svc,subsvc_state_t state) +{ + if(state & SUBSVC_WAITING_RESULT) + { + + /** Increase waiter count */ + atomic_add(&svc->n_res_waiting, 1); + } + + svc->state |= state; +} + +void +subsvc_clear_state(SUBSERVICE* svc,subsvc_state_t state) +{ + + + if(state & SUBSVC_WAITING_RESULT) + { + /** Decrease waiter count */ + atomic_add(&svc->n_res_waiting, -1); + } + + svc->state &= ~state; +} + +bool +get_shard_subsvc(SUBSERVICE** subsvc,ROUTER_CLIENT_SES* session,char* target) +{ + int i; + + if(subsvc == NULL || session == NULL || target == NULL) + return false; + + for(i = 0;in_subservice;i++) + { + if(strcmp(session->subservice[i]->service->name,target) == 0) + { + + if (SUBSVC_IS_OK(session->subservice[i])) + { + if(subsvc_is_valid(session->subservice[i])){ + *subsvc = session->subservice[i]; + return true; + } + + /** + * The service has failed + */ + + subsvc_set_state(session->subservice[i],SUBSVC_FAILED); + } + } + } + + return false; +}