diff --git a/server/modules/include/dbshard.h b/server/modules/include/dbshard.h new file mode 100644 index 000000000..131a22f23 --- /dev/null +++ b/server/modules/include/dbshard.h @@ -0,0 +1,329 @@ +#ifndef _DBSHARDROUTER_H +#define _DBSHARDROUTER_H +/* + * This file is distributed as part of the MariaDB Corporation MaxScale. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright MariaDB Corporation Ab 2013-2014 + */ + +/** + * @file router.h - The dbshard router module heder file + * + * @verbatim + * Revision History + * + * See GitHub https://github.com/skysql/MaxScale + * + * @endverbatim + */ + +#include +#include + +#undef PREP_STMT_CACHING + +#if defined(PREP_STMT_CACHING) + +typedef enum prep_stmt_type { + PREP_STMT_NAME, + PREP_STMT_ID +} prep_stmt_type_t; + +typedef enum prep_stmt_state { + PREP_STMT_ALLOC, + PREP_STMT_SENT, + PREP_STMT_RECV, + PREP_STMT_DROPPED +} prep_stmt_state_t; + +#endif /*< PREP_STMT_CACHING */ + +typedef enum bref_state { + BREF_IN_USE = 0x01, + BREF_WAITING_RESULT = 0x02, /*< for session commands only */ + BREF_QUERY_ACTIVE = 0x04, /*< for other queries */ + BREF_CLOSED = 0x08 +} bref_state_t; + +#define BREF_IS_NOT_USED(s) ((s)->bref_state & ~BREF_IN_USE) +#define BREF_IS_IN_USE(s) ((s)->bref_state & BREF_IN_USE) +#define BREF_IS_WAITING_RESULT(s) ((s)->bref_num_result_wait > 0) +#define BREF_IS_QUERY_ACTIVE(s) ((s)->bref_state & BREF_QUERY_ACTIVE) +#define BREF_IS_CLOSED(s) ((s)->bref_state & BREF_CLOSED) + +typedef enum backend_type_t { + BE_UNDEFINED=-1, + BE_MASTER, + BE_JOINED = BE_MASTER, + BE_SLAVE, + BE_COUNT +} backend_type_t; + +struct router_instance; + +typedef enum { + TARGET_UNDEFINED = 0x00, + TARGET_MASTER = 0x01, + TARGET_SLAVE = 0x02, + TARGET_NAMED_SERVER = 0x04, + TARGET_ALL = 0x08, + TARGET_RLAG_MAX = 0x10 +} route_target_t; + +#define TARGET_IS_MASTER(t) (t & TARGET_MASTER) +#define TARGET_IS_SLAVE(t) (t & TARGET_SLAVE) +#define TARGET_IS_NAMED_SERVER(t) (t & TARGET_NAMED_SERVER) +#define TARGET_IS_ALL(t) (t & TARGET_ALL) +#define TARGET_IS_RLAG_MAX(t) (t & TARGET_RLAG_MAX) + +typedef struct rses_property_st rses_property_t; +typedef struct router_client_session ROUTER_CLIENT_SES; + +typedef enum rses_property_type_t { + RSES_PROP_TYPE_UNDEFINED=-1, + RSES_PROP_TYPE_SESCMD=0, + RSES_PROP_TYPE_FIRST = RSES_PROP_TYPE_SESCMD, + RSES_PROP_TYPE_TMPTABLES, + RSES_PROP_TYPE_LAST=RSES_PROP_TYPE_TMPTABLES, + RSES_PROP_TYPE_COUNT=RSES_PROP_TYPE_LAST+1 +} rses_property_type_t; + + + +/** + * This criteria is used when backends are chosen for a router session's use. + * Backend servers are sorted to ascending order according to the criteria + * and top N are chosen. + */ +typedef enum select_criteria { + UNDEFINED_CRITERIA=0, + LEAST_GLOBAL_CONNECTIONS, /*< all connections established by MaxScale */ + LEAST_ROUTER_CONNECTIONS, /*< connections established by this router */ + LEAST_BEHIND_MASTER, + LEAST_CURRENT_OPERATIONS, + DEFAULT_CRITERIA=LEAST_CURRENT_OPERATIONS, + LAST_CRITERIA /*< not used except for an index */ +} select_criteria_t; + + +/** default values for rwsplit configuration parameters */ +#define CONFIG_MAX_SLAVE_CONN 1 +#define CONFIG_MAX_SLAVE_RLAG -1 /*< not used */ +#define CONFIG_SQL_VARIABLES_IN TYPE_ALL + +#define GET_SELECT_CRITERIA(s) \ + (strncmp(s,"LEAST_GLOBAL_CONNECTIONS", strlen("LEAST_GLOBAL_CONNECTIONS")) == 0 ? \ + LEAST_GLOBAL_CONNECTIONS : ( \ + strncmp(s,"LEAST_BEHIND_MASTER", strlen("LEAST_BEHIND_MASTER")) == 0 ? \ + LEAST_BEHIND_MASTER : ( \ + strncmp(s,"LEAST_ROUTER_CONNECTIONS", strlen("LEAST_ROUTER_CONNECTIONS")) == 0 ? \ + LEAST_ROUTER_CONNECTIONS : ( \ + strncmp(s,"LEAST_CURRENT_OPERATIONS", strlen("LEAST_CURRENT_OPERATIONS")) == 0 ? \ + LEAST_CURRENT_OPERATIONS : UNDEFINED_CRITERIA)))) + +/** + * Session variable command + */ +typedef struct mysql_sescmd_st { +#if defined(SS_DEBUG) + skygw_chk_t my_sescmd_chk_top; +#endif + rses_property_t* my_sescmd_prop; /*< parent property */ + GWBUF* my_sescmd_buf; /*< query buffer */ + unsigned char my_sescmd_packet_type;/*< packet type */ + bool my_sescmd_is_replied; /*< is cmd replied to client */ +#if defined(SS_DEBUG) + skygw_chk_t my_sescmd_chk_tail; +#endif +} mysql_sescmd_t; + + +/** + * Property structure + */ +struct rses_property_st { +#if defined(SS_DEBUG) + skygw_chk_t rses_prop_chk_top; +#endif + ROUTER_CLIENT_SES* rses_prop_rsession; /*< parent router session */ + int rses_prop_refcount; + rses_property_type_t rses_prop_type; + union rses_prop_data { + mysql_sescmd_t sescmd; + HASHTABLE* temp_tables; + } rses_prop_data; + rses_property_t* rses_prop_next; /*< next property of same type */ +#if defined(SS_DEBUG) + skygw_chk_t rses_prop_chk_tail; +#endif +}; + +typedef struct sescmd_cursor_st { +#if defined(SS_DEBUG) + skygw_chk_t scmd_cur_chk_top; +#endif + ROUTER_CLIENT_SES* scmd_cur_rses; /*< pointer to owning router session */ + rses_property_t** scmd_cur_ptr_property; /*< address of pointer to owner property */ + mysql_sescmd_t* scmd_cur_cmd; /*< pointer to current session command */ + bool scmd_cur_active; /*< true if command is being executed */ +#if defined(SS_DEBUG) + skygw_chk_t scmd_cur_chk_tail; +#endif +} sescmd_cursor_t; + +/** + * Internal structure used to define the set of backend servers we are routing + * connections to. This provides the storage for routing module specific data + * that is required for each of the backend servers. + * + * Owned by router_instance, referenced by each routing session. + */ +typedef struct backend_st { +#if defined(SS_DEBUG) + skygw_chk_t be_chk_top; +#endif + SERVER* backend_server; /*< The server itself */ + int backend_conn_count; /*< Number of connections to + * the server + */ + bool be_valid; /*< Valid when belongs to the + * router's configuration + */ + int weight; /*< Desired weighting on the + * load. Expressed in .1% + * increments + */ +#if defined(SS_DEBUG) + skygw_chk_t be_chk_tail; +#endif +} BACKEND; + + +/** + * Reference to BACKEND. + * + * Owned by router client session. + */ +typedef struct backend_ref_st { +#if defined(SS_DEBUG) + skygw_chk_t bref_chk_top; +#endif + BACKEND* bref_backend; + DCB* bref_dcb; + bref_state_t bref_state; + int bref_num_result_wait; + sescmd_cursor_t bref_sescmd_cur; + GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */ +#if defined(SS_DEBUG) + skygw_chk_t bref_chk_tail; +#endif +} backend_ref_t; + + +typedef struct rwsplit_config_st { + int rw_max_slave_conn_percent; + int rw_max_slave_conn_count; + select_criteria_t rw_slave_select_criteria; + int rw_max_slave_replication_lag; + target_t rw_use_sql_variables_in; +} rwsplit_config_t; + + +#if defined(PREP_STMT_CACHING) + +typedef struct prep_stmt_st { +#if defined(SS_DEBUG) + skygw_chk_t pstmt_chk_top; +#endif + union id { + int seq; + char* name; + } pstmt_id; + prep_stmt_state_t pstmt_state; + prep_stmt_type_t pstmt_type; +#if defined(SS_DEBUG) + skygw_chk_t pstmt_chk_tail; +#endif +} prep_stmt_t; + +#endif /*< PREP_STMT_CACHING */ + +/** + * The client session structure used within this router. + */ +struct router_client_session { +#if defined(SS_DEBUG) + skygw_chk_t rses_chk_top; +#endif + SPINLOCK rses_lock; /*< protects rses_deleted */ + int rses_versno; /*< even = no active update, else odd. not used 4/14 */ + bool rses_closed; /*< true when closeSession is called */ + /** Properties listed by their type */ + rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; + backend_ref_t* rses_master_ref; + backend_ref_t* rses_backend_ref; /*< Pointer to backend reference array */ + rwsplit_config_t rses_config; /*< copied config info from router instance */ + int rses_nbackends; + int rses_capabilities; /*< input type, for example */ + bool rses_autocommit_enabled; + bool rses_transaction_active; +#if defined(PREP_STMT_CACHING) + HASHTABLE* rses_prep_stmt[2]; +#endif + struct router_instance *router; /*< The router instance */ + struct router_client_session* next; +#if defined(SS_DEBUG) + skygw_chk_t rses_chk_tail; +#endif +}; + +/** + * The statistics for this router instance + */ +typedef struct { + int n_sessions; /*< Number sessions created */ + int n_queries; /*< Number of queries forwarded */ + int n_master; /*< Number of stmts sent to master */ + int n_slave; /*< Number of stmts sent to slave */ + int n_all; /*< Number of stmts sent to all */ +} ROUTER_STATS; + + +/** + * The per instance data for the router. + */ +typedef struct router_instance { + SERVICE* service; /*< Pointer to service */ + ROUTER_CLIENT_SES* connections; /*< List of client connections */ + SPINLOCK lock; /*< Lock for the instance data */ + BACKEND** servers; /*< Backend servers */ + BACKEND* master; /*< NULL or pointer */ + rwsplit_config_t rwsplit_config; /*< expanded config info from SERVICE */ + int rwsplit_version;/*< version number for router's config */ + unsigned int bitmask; /*< Bitmask to apply to server->status */ + unsigned int bitvalue; /*< Required value of server->status */ + ROUTER_STATS stats; /*< Statistics for this router */ + struct router_instance* next; /*< Next router on the list */ + bool available_slaves; /*< The router has some slaves available */ + HASHTABLE* dbnames_hash; /** Hashtable containing the database names and where to find them */ +} ROUTER_INSTANCE; + +#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \ + (SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED)); + +void* dbnames_hash_init(BACKEND** backends); +bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable); + +#endif /*< _DBSHARDROUTER_H */ diff --git a/server/modules/routing/CMakeLists.txt b/server/modules/routing/CMakeLists.txt index 8208c470b..def9ca350 100644 --- a/server/modules/routing/CMakeLists.txt +++ b/server/modules/routing/CMakeLists.txt @@ -6,6 +6,10 @@ add_library(testroute SHARED testroute.c) target_link_libraries(testroute log_manager utils) install(TARGETS testroute DESTINATION modules) +add_library(dbshard SHARED dbshard/dbshard.c) +target_link_libraries(dbshard log_manager utils query_classifier) +install(TARGETS dbshard DESTINATION modules) + add_library(readconnroute SHARED readconnroute.c) target_link_libraries(readconnroute log_manager utils) install(TARGETS readconnroute DESTINATION modules) diff --git a/server/modules/routing/dbshard/dbshard.c b/server/modules/routing/dbshard/dbshard.c index 7a6e5b1cf..c306df50c 100644 --- a/server/modules/routing/dbshard/dbshard.c +++ b/server/modules/routing/dbshard/dbshard.c @@ -23,7 +23,7 @@ #include #include -#include +#include #include #include @@ -142,13 +142,21 @@ int bref_cmp_current_load( const void* bref1, const void* bref2); +int bref_cmp_none( + const void* bref1, + const void* bref2) +{ + return -1; +} + + /** * The order of functions _must_ match with the order the select criteria are * listed in select_criteria_t definition in readwritesplit.h */ int (*criteria_cmpfun[LAST_CRITERIA])(const void*, const void*)= { - NULL, + bref_cmp_none, bref_cmp_global_conn, bref_cmp_router_conn, bref_cmp_behind_master, @@ -172,11 +180,6 @@ static bool get_dcb( char* name, int max_rlag); -static void rwsplit_process_router_options( - ROUTER_INSTANCE* router, - char** options); - - static ROUTER_OBJECT MyObject = { createInstance, @@ -253,6 +256,11 @@ static void tracelog_routed_query( backend_ref_t* bref, GWBUF* buf); +static void dbshard_process_router_options( + ROUTER_INSTANCE* router, + char** options); + + static bool route_session_write( ROUTER_CLIENT_SES* router_client_ses, GWBUF* querybuf, @@ -336,6 +344,153 @@ static void* hfree(void* fval) } +/** + * Updates the hashtable with the database names and where to find them, adding new and removing obsolete pairs. + * @param backends Backends to query for database names + * @param hashtable Hashtable to use + * @return True if all database and server names were successfully retrieved otherwise false + */ +bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable) +{ + const unsigned int connect_timeout = 5; + const unsigned int read_timeout = 2; + bool rval = true; + SERVER* server; + MYSQL* handle; + MYSQL_RES* result; + MYSQL_ROW row; + int i, rc, numfields; + + for(i = 0;backends[i] && rval;i++){ + + handle = mysql_init(NULL); + + if(handle == NULL){ + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Error: Failed to initialize MySQL handle."))); + continue; + } + + rc = 0; + rc |= mysql_options(handle, MYSQL_OPT_CONNECT_TIMEOUT, (void *)&connect_timeout); + rc |= mysql_options(handle, MYSQL_OPT_READ_TIMEOUT, (void *)&read_timeout); + if(rc != 0){ + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Error: Failed to set MySQL connection options."))); + mysql_close(handle); + rval = false; + continue; + } + + server = backends[i]->backend_server; + ss_dassert(server != NULL); + + if (mysql_real_connect(handle, + server->name, + server->monuser, + server->monpw, + NULL, + server->port, + NULL, + 0) == NULL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error: Failed to connect to backend server '%s'.",server->name))); + rval = false; + goto cleanup; + } + + /** + * The server was successfully connected to, proceed to query for database names + */ + + if((result = mysql_list_dbs(handle,NULL)) == NULL){ + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "Error: Failed to execute query in backend server '%s'.",server->name))); + goto cleanup; + } + + numfields = mysql_num_fields(result); + + if(numfields < 1){ + LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE, + "Backend '%s' has no databases.",server->name))); + goto cleanup; + } + + /** + * Walk through the list of databases in this backend + * and insert them into the hashtable. If the value is already in the hashtable + * but the backend isn't in the list of backends it is replaced with the first found backend. + */ + + while((row = mysql_fetch_row(result))) + { + + unsigned long *lengths; + char *dbnm = NULL,*servnm = NULL; + + lengths = mysql_fetch_lengths(result); + dbnm = (char*)calloc(lengths[0] + 1,sizeof(char)); + memcpy(dbnm,row[0],lengths[0]); + servnm = strdup(server->unique_name); + + if(hashtable_add(hashtable,dbnm,servnm) == 0){ + + /*Check if the failure was due to a duplicate value*/ + if(hashtable_fetch(hashtable,dbnm) == NULL){ + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "Error: Failed to insert values into hashtable."))); + }else{ + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "Error: Duplicate value found."))); + } + rval = false; + free(dbnm); + free(servnm); + } + } + + cleanup: + + if(result){ + mysql_free_result(result); + } + result = NULL; + mysql_close(handle); + } + + return rval; +} + + +/** + * Allocates a new hashtable and inserts database names and where to find them into it. + * @param backends Backends to query for database names + * @return Pointer to the newly allocated hashtable or NULL if an error occurred + */ +void* dbnames_hash_init(BACKEND** backends) +{ + HASHTABLE* htbl = hashtable_alloc(32,hashkeyfun,hashcmpfun); + + if(htbl == NULL) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Error: Hashtable allocation failed."))); + return NULL; + } + + + + /**Update the new hashtable with the key-value pairs*/ + if(!update_dbnames_hash(backends,htbl)){ + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Errors encountered while querying databases."))); + hashtable_free(htbl); + return NULL; + } + + return htbl; + +} + /** * Implementation of the mandatory version entry point * @@ -599,11 +754,11 @@ createInstance(SERVICE *service, char **options) * is used if any. */ router->rwsplit_version = service->svc_config_version; - + refreshInstance(router, NULL); /** * Get hashtable which includes dbname,backend pairs */ - router->dbnames_hash = dbnames_hash_init(router->servers); + router->dbnames_hash = (HASHTABLE*)dbnames_hash_init(router->servers); /** * We have completed the creation of the router data, so now * insert this router into the linked list of routers @@ -668,7 +823,7 @@ static void* newSession( /** increment rwsplit router's config version number */ router->rwsplit_version = router->service->svc_config_version; /** Read options */ - rwsplit_process_router_options(router, router->service->routerOptions); + dbshard_process_router_options(router, router->service->routerOptions); } /** Copy config struct from router instance */ client_rses->rses_config = router->rwsplit_config; @@ -4331,6 +4486,7 @@ static bool have_enough_servers( } if (nservers < min_nsrv) { + float err_pct = ((float)min_nsrv/(float)router_nsrv)*100.f; LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Unable to start %s service. There are " @@ -4339,7 +4495,7 @@ static bool have_enough_servers( "would be required.", router->service->name, (*p_rses)->rses_config.rw_max_slave_conn_percent, - min_nsrv/(router_nsrv/100)))); + (int)err_pct))); } } free(*p_rses); @@ -4646,11 +4802,54 @@ static backend_ref_t* get_root_master_bref( } - - - - - - - - +static void dbshard_process_router_options( + ROUTER_INSTANCE* router, + char** options) +{ + int i; + char* value; + select_criteria_t c; + + for (i = 0; options[i]; i++) + { + if ((value = strchr(options[i], '=')) == NULL) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, "Warning : Unsupported " + "router option \"%s\" for " + "readwritesplit router.", + options[i]))); + } + else + { + *value = 0; + value++; + if (strcmp(options[i], "slave_selection_criteria") == 0) + { + c = GET_SELECT_CRITERIA(value); + ss_dassert( + c == LEAST_GLOBAL_CONNECTIONS || + c == LEAST_ROUTER_CONNECTIONS || + c == LEAST_BEHIND_MASTER || + c == LEAST_CURRENT_OPERATIONS || + c == UNDEFINED_CRITERIA); + + if (c == UNDEFINED_CRITERIA) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, "Warning : Unknown " + "slave selection criteria \"%s\". " + "Allowed values are LEAST_GLOBAL_CONNECTIONS, " + "LEAST_ROUTER_CONNECTIONS, " + "LEAST_BEHIND_MASTER," + "and LEAST_CURRENT_OPERATIONS.", + STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria)))); + } + else + { + router->rwsplit_config.rw_slave_select_criteria = c; + } + } + } + } /*< for */ +}