Merge branch 'release-1.3.0' into doc_fixes
This commit is contained in:
@ -117,7 +117,7 @@ This parameter is used to define the maximum amount of data that will be sent to
|
||||
This parameter is used to enable/disable incomplete transactions detection in binlog router.
|
||||
When MaxScale starts an error message may appear if current binlog file is corrupted or an incomplete transaction is found.
|
||||
During normal operations binlog events are not distributed to the slaves until a COMMIT is seen.
|
||||
The default value is on, set transaction_safety=off to completely disable the incomplete transactions detection.
|
||||
The default value is off, set transaction_safety=on to enable the incomplete transactions detection.
|
||||
|
||||
A complete example of a service entry for a binlog router service would be as follows.
|
||||
```
|
||||
|
@ -3,4 +3,3 @@ set(CPACK_GENERATOR "${CPACK_GENERATOR};DEB")
|
||||
set(CPACK_DEBIAN_PACKAGE_CONTROL_EXTRA "${CMAKE_BINARY_DIR}/postinst;{CMAKE_BINARY_DIR}/postrm")
|
||||
execute_process(COMMAND dpgk --print-architecture OUTPUT_VARIABLE DEB_ARCHITECTURE)
|
||||
set(CPACK_DEBIAN_PACKAGE_ARCHITECTURE ${DEB_ARCHITECTURE})
|
||||
set (CPACK_DEBIAN_PACKAGE_SHLIBDEPS ON)
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -17,7 +17,7 @@ Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
|
||||
Copyright MariaDB Corporation Ab
|
||||
|
||||
*/
|
||||
*/
|
||||
|
||||
/** getpid */
|
||||
#include <my_config.h>
|
||||
@ -33,7 +33,8 @@ EXTERN_C_BLOCK_BEGIN
|
||||
* The meaninful difference is where operation is done and whether master data
|
||||
* is modified
|
||||
*/
|
||||
typedef enum {
|
||||
typedef enum
|
||||
{
|
||||
QUERY_TYPE_UNKNOWN = 0x000000, /*< Initial value, can't be tested bitwisely */
|
||||
QUERY_TYPE_LOCAL_READ = 0x000001, /*< Read non-database data, execute in MaxScale:any */
|
||||
QUERY_TYPE_READ = 0x000002, /*< Read database data:any */
|
||||
@ -41,11 +42,11 @@ typedef enum {
|
||||
QUERY_TYPE_MASTER_READ = 0x000008, /*< Read from the master:master */
|
||||
QUERY_TYPE_SESSION_WRITE = 0x000010, /*< Session data will be modified:master or all */
|
||||
/** Not implemented yet */
|
||||
// QUERY_TYPE_USERVAR_WRITE = 0x000020, /*< Write a user variable:master or all */
|
||||
// QUERY_TYPE_USERVAR_WRITE = 0x000020, /*< Write a user variable:master or all */
|
||||
QUERY_TYPE_USERVAR_READ = 0x000040, /*< Read a user variable:master or any */
|
||||
QUERY_TYPE_SYSVAR_READ = 0x000080, /*< Read a system variable:master or any */
|
||||
/** Not implemented yet */
|
||||
// QUERY_TYPE_SYSVAR_WRITE = 0x000100, /*< Write a system variable:master or all */
|
||||
// QUERY_TYPE_SYSVAR_WRITE = 0x000100, /*< Write a system variable:master or all */
|
||||
QUERY_TYPE_GSYSVAR_READ = 0x000200, /*< Read global system variable:master or any */
|
||||
QUERY_TYPE_GSYSVAR_WRITE = 0x000400, /*< Write global system variable:master or all */
|
||||
QUERY_TYPE_BEGIN_TRX = 0x000800, /*< BEGIN or START TRANSACTION */
|
||||
@ -62,7 +63,8 @@ typedef enum {
|
||||
QUERY_TYPE_SHOW_TABLES = 0x400000 /*< Show list of tables */
|
||||
} skygw_query_type_t;
|
||||
|
||||
typedef enum {
|
||||
typedef enum
|
||||
{
|
||||
QUERY_OP_UNDEFINED = 0,
|
||||
QUERY_OP_SELECT = 1,
|
||||
QUERY_OP_UPDATE = (1 << 1),
|
||||
@ -77,9 +79,10 @@ typedef enum {
|
||||
QUERY_OP_DROP_INDEX = (1 << 10),
|
||||
QUERY_OP_CHANGE_DB = (1 << 11),
|
||||
QUERY_OP_LOAD = (1 << 12)
|
||||
}skygw_query_op_t;
|
||||
} skygw_query_op_t;
|
||||
|
||||
typedef struct parsing_info_st {
|
||||
typedef struct parsing_info_st
|
||||
{
|
||||
#if defined(SS_DEBUG)
|
||||
skygw_chk_t pi_chk_top;
|
||||
#endif
|
||||
@ -100,23 +103,26 @@ typedef struct parsing_info_st {
|
||||
*/
|
||||
skygw_query_type_t query_classifier_get_type(GWBUF* querybuf);
|
||||
skygw_query_op_t query_classifier_get_operation(GWBUF* querybuf);
|
||||
/** Free THD context and close MYSQL */
|
||||
|
||||
#if defined(NOT_USED)
|
||||
char* skygw_query_classifier_get_stmtname(GWBUF* buf);
|
||||
#endif
|
||||
|
||||
char* skygw_get_created_table_name(GWBUF* querybuf);
|
||||
bool is_drop_table_query(GWBUF* querybuf);
|
||||
bool skygw_is_real_query(GWBUF* querybuf);
|
||||
char** skygw_get_table_names(GWBUF* querybuf, int* tblsize, bool fullnames);
|
||||
char* skygw_get_canonical(GWBUF* querybuf);
|
||||
bool parse_query (GWBUF* querybuf);
|
||||
bool parse_query(GWBUF* querybuf);
|
||||
parsing_info_t* parsing_info_init(void (*donefun)(void *));
|
||||
|
||||
/** Free THD context and close MYSQL */
|
||||
void parsing_info_done(void* ptr);
|
||||
bool query_is_parsed(GWBUF* buf);
|
||||
bool skygw_query_has_clause(GWBUF* buf);
|
||||
char* skygw_get_qtype_str(skygw_query_type_t qtype);
|
||||
char* skygw_get_affected_fields(GWBUF* buf);
|
||||
char** skygw_get_database_names(GWBUF* querybuf,int* size);
|
||||
char** skygw_get_database_names(GWBUF* querybuf, int* size);
|
||||
|
||||
EXTERN_C_BLOCK_END
|
||||
|
||||
|
@ -210,7 +210,7 @@ static bool resolve_maxscale_conf_fname(
|
||||
char* cnf_file_arg);
|
||||
|
||||
static char* check_dir_access(char* dirname, bool, bool);
|
||||
static int set_user();
|
||||
static int set_user(const char* user);
|
||||
bool pid_file_exists();
|
||||
void write_child_exit_code(int fd, int code);
|
||||
/** SSL multi-threading functions and structures */
|
||||
@ -968,7 +968,7 @@ static void usage(void)
|
||||
" (default: /etc/)\n"
|
||||
" -D, --datadir=PATH path to data directory, stored embedded mysql tables\n"
|
||||
" (default: /var/cache/maxscale)\n"
|
||||
" -N, --language=PATH apth to errmsg.sys file\n"
|
||||
" -N, --language=PATH path to errmsg.sys file\n"
|
||||
" (default: /var/lib/maxscale)\n"
|
||||
" -P, --piddir=PATH path to PID file directory\n"
|
||||
" (default: /var/run/maxscale)\n"
|
||||
@ -2433,7 +2433,7 @@ static int cnf_preparser(void* data, const char* section, const char* name, cons
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int set_user(char* user)
|
||||
static int set_user(const char* user)
|
||||
{
|
||||
errno = 0;
|
||||
struct passwd *pwname;
|
||||
|
@ -109,13 +109,13 @@ bitmask_set(GWBITMASK *bitmask, int bit)
|
||||
unsigned char mask;
|
||||
|
||||
spinlock_acquire(&bitmask->lock);
|
||||
if (bit >= bitmask->length)
|
||||
while (bit >= bitmask->length)
|
||||
{
|
||||
bitmask->bits = realloc(bitmask->bits,
|
||||
(bitmask->length + BIT_LENGTH_INC) / 8);
|
||||
memset(bitmask->bits + (bitmask->length / 8), 0,
|
||||
BIT_LENGTH_INC / 8);
|
||||
bitmask->length += (BIT_LENGTH_INC / 8);
|
||||
bitmask->length += BIT_LENGTH_INC;
|
||||
}
|
||||
ptr = bitmask->bits + (bit / 8);
|
||||
mask = 1 << (bit % 8);
|
||||
|
@ -33,8 +33,8 @@
|
||||
*/
|
||||
|
||||
/* Both these numbers MUST be exact multiples of 8 */
|
||||
#define BIT_LENGTH_INITIAL 32 /**< Initial number of bits in the bitmask */
|
||||
#define BIT_LENGTH_INC 32 /**< Number of bits to add on each increment */
|
||||
#define BIT_LENGTH_INITIAL 256 /**< Initial number of bits in the bitmask */
|
||||
#define BIT_LENGTH_INC 256 /**< Number of bits to add on each increment */
|
||||
|
||||
/**
|
||||
* The bitmask structure used to store an arbitrary large bitmask
|
||||
|
@ -37,7 +37,8 @@
|
||||
* connections to. This provides the storage for routing module specific data
|
||||
* that is required for each of the backend servers.
|
||||
*/
|
||||
typedef struct backend {
|
||||
typedef struct backend
|
||||
{
|
||||
SERVER *server; /*< The server itself */
|
||||
int current_connection_count; /*< Number of connections to the server */
|
||||
int weight; /*< Desired routing weight */
|
||||
@ -46,7 +47,8 @@ typedef struct backend {
|
||||
/**
|
||||
* The client session structure used within this router.
|
||||
*/
|
||||
typedef struct router_client_session {
|
||||
typedef struct router_client_session
|
||||
{
|
||||
#if defined(SS_DEBUG)
|
||||
skygw_chk_t rses_chk_top;
|
||||
#endif
|
||||
@ -65,16 +67,17 @@ typedef struct router_client_session {
|
||||
/**
|
||||
* The statistics for this router instance
|
||||
*/
|
||||
typedef struct {
|
||||
typedef struct
|
||||
{
|
||||
int n_sessions; /*< Number sessions created */
|
||||
int n_queries; /*< Number of queries forwarded */
|
||||
} ROUTER_STATS;
|
||||
|
||||
|
||||
/**
|
||||
* The per instance data for the router.
|
||||
*/
|
||||
typedef struct router_instance {
|
||||
typedef struct router_instance
|
||||
{
|
||||
SERVICE *service; /*< Pointer to the service using this router */
|
||||
ROUTER_CLIENT_SES *connections; /*< Link list of all the client connections */
|
||||
SPINLOCK lock; /*< Spinlock for the instance data */
|
||||
|
@ -284,7 +284,7 @@ char task_name[BLRM_TASK_NAME_LEN+1] = "";
|
||||
inst->m_errno = 0;
|
||||
inst->m_errmsg = NULL;
|
||||
|
||||
inst->trx_safe = 1;
|
||||
inst->trx_safe = 0;
|
||||
inst->pending_transaction = 0;
|
||||
inst->last_safe_pos = 0;
|
||||
|
||||
|
@ -96,7 +96,8 @@
|
||||
|
||||
#include "modutil.h"
|
||||
|
||||
MODULE_INFO info = {
|
||||
MODULE_INFO info =
|
||||
{
|
||||
MODULE_API_ROUTER,
|
||||
MODULE_GA,
|
||||
ROUTER_VERSION,
|
||||
@ -112,23 +113,16 @@ static void closeSession(ROUTER *instance, void *router_session);
|
||||
static void freeSession(ROUTER *instance, void *router_session);
|
||||
static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue);
|
||||
static void diagnostics(ROUTER *instance, DCB *dcb);
|
||||
static void clientReply(
|
||||
ROUTER *instance,
|
||||
void *router_session,
|
||||
GWBUF *queue,
|
||||
static void clientReply(ROUTER *instance, void *router_session, GWBUF *queue,
|
||||
DCB *backend_dcb);
|
||||
static void handleError(
|
||||
ROUTER *instance,
|
||||
void *router_session,
|
||||
GWBUF *errbuf,
|
||||
DCB *problem_dcb,
|
||||
error_action_t action,
|
||||
bool *succp);
|
||||
static int getCapabilities ();
|
||||
static void handleError(ROUTER *instance, void *router_session, GWBUF *errbuf,
|
||||
DCB *problem_dcb, error_action_t action, bool *succp);
|
||||
static int getCapabilities();
|
||||
|
||||
|
||||
/** The module object definition */
|
||||
static ROUTER_OBJECT MyObject = {
|
||||
static ROUTER_OBJECT MyObject =
|
||||
{
|
||||
createInstance,
|
||||
newSession,
|
||||
closeSession,
|
||||
@ -140,16 +134,12 @@ static ROUTER_OBJECT MyObject = {
|
||||
getCapabilities
|
||||
};
|
||||
|
||||
static bool rses_begin_locked_router_action(
|
||||
ROUTER_CLIENT_SES* rses);
|
||||
static bool rses_begin_locked_router_action(ROUTER_CLIENT_SES* rses);
|
||||
|
||||
static void rses_end_locked_router_action(
|
||||
ROUTER_CLIENT_SES* rses);
|
||||
static void rses_end_locked_router_action(ROUTER_CLIENT_SES* rses);
|
||||
|
||||
static BACKEND *get_root_master(
|
||||
BACKEND **servers);
|
||||
static int handle_state_switch(
|
||||
DCB* dcb,DCB_REASON reason, void * routersession);
|
||||
static BACKEND *get_root_master(BACKEND **servers);
|
||||
static int handle_state_switch(DCB* dcb, DCB_REASON reason, void * routersession);
|
||||
static SPINLOCK instlock;
|
||||
static ROUTER_INSTANCE *instances;
|
||||
|
||||
@ -202,14 +192,15 @@ GetModuleObject()
|
||||
static ROUTER *
|
||||
createInstance(SERVICE *service, char **options)
|
||||
{
|
||||
ROUTER_INSTANCE *inst;
|
||||
SERVER *server;
|
||||
SERVER_REF *sref;
|
||||
int i, n;
|
||||
BACKEND *backend;
|
||||
char *weightby;
|
||||
ROUTER_INSTANCE *inst;
|
||||
SERVER *server;
|
||||
SERVER_REF *sref;
|
||||
int i, n;
|
||||
BACKEND *backend;
|
||||
char *weightby;
|
||||
|
||||
if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
|
||||
if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -222,9 +213,11 @@ char *weightby;
|
||||
* backend server.
|
||||
*/
|
||||
for (sref = service->dbref, n = 0; sref; sref = sref->next)
|
||||
{
|
||||
n++;
|
||||
}
|
||||
|
||||
inst->servers = (BACKEND **)calloc(n + 1, sizeof(BACKEND *));
|
||||
inst->servers = (BACKEND **) calloc(n + 1, sizeof(BACKEND *));
|
||||
if (!inst->servers)
|
||||
{
|
||||
free(inst);
|
||||
@ -236,7 +229,9 @@ char *weightby;
|
||||
if ((inst->servers[n] = malloc(sizeof(BACKEND))) == NULL)
|
||||
{
|
||||
for (i = 0; i < n; i++)
|
||||
{
|
||||
free(inst->servers[i]);
|
||||
}
|
||||
free(inst->servers);
|
||||
free(inst);
|
||||
return NULL;
|
||||
@ -325,12 +320,12 @@ char *weightby;
|
||||
{
|
||||
if (!strcasecmp(options[i], "master"))
|
||||
{
|
||||
inst->bitmask |= (SERVER_MASTER|SERVER_SLAVE);
|
||||
inst->bitmask |= (SERVER_MASTER | SERVER_SLAVE);
|
||||
inst->bitvalue |= SERVER_MASTER;
|
||||
}
|
||||
else if (!strcasecmp(options[i], "slave"))
|
||||
{
|
||||
inst->bitmask |= (SERVER_MASTER|SERVER_SLAVE);
|
||||
inst->bitmask |= (SERVER_MASTER | SERVER_SLAVE);
|
||||
inst->bitvalue |= SERVER_SLAVE;
|
||||
}
|
||||
else if (!strcasecmp(options[i], "running"))
|
||||
@ -358,7 +353,7 @@ char *weightby;
|
||||
}
|
||||
}
|
||||
}
|
||||
if(inst->bitmask == 0 && inst->bitvalue == 0)
|
||||
if (inst->bitmask == 0 && inst->bitvalue == 0)
|
||||
{
|
||||
/** No parameters given, use RUNNING as a valid server */
|
||||
inst->bitmask |= (SERVER_RUNNING);
|
||||
@ -374,7 +369,7 @@ char *weightby;
|
||||
instances = inst;
|
||||
spinlock_release(&instlock);
|
||||
|
||||
return (ROUTER *)inst;
|
||||
return(ROUTER *) inst;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -387,11 +382,11 @@ char *weightby;
|
||||
static void *
|
||||
newSession(ROUTER *instance, SESSION *session)
|
||||
{
|
||||
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance;
|
||||
ROUTER_CLIENT_SES *client_rses;
|
||||
BACKEND *candidate = NULL;
|
||||
int i;
|
||||
BACKEND *master_host = NULL;
|
||||
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *) instance;
|
||||
ROUTER_CLIENT_SES *client_rses;
|
||||
BACKEND *candidate = NULL;
|
||||
int i;
|
||||
BACKEND *master_host = NULL;
|
||||
|
||||
MXS_DEBUG("%lu [newSession] new router session with session "
|
||||
"%p, and inst %p.",
|
||||
@ -400,9 +395,10 @@ BACKEND *master_host = NULL;
|
||||
inst);
|
||||
|
||||
|
||||
client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES));
|
||||
client_rses = (ROUTER_CLIENT_SES *) calloc(1, sizeof(ROUTER_CLIENT_SES));
|
||||
|
||||
if (client_rses == NULL) {
|
||||
if (client_rses == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -434,8 +430,10 @@ BACKEND *master_host = NULL;
|
||||
* become the new candidate. This has the effect of spreading the
|
||||
* connections over different servers during periods of very low load.
|
||||
*/
|
||||
for (i = 0; inst->servers[i]; i++) {
|
||||
if(inst->servers[i]) {
|
||||
for (i = 0; inst->servers[i]; i++)
|
||||
{
|
||||
if (inst->servers[i])
|
||||
{
|
||||
MXS_DEBUG("%lu [newSession] Examine server in port %d with "
|
||||
"%d connections. Status is %s, "
|
||||
"inst->bitvalue is %d",
|
||||
@ -447,18 +445,24 @@ BACKEND *master_host = NULL;
|
||||
}
|
||||
|
||||
if (SERVER_IN_MAINT(inst->servers[i]->server))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (inst->servers[i]->weight == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Check server status bits against bitvalue from router_options */
|
||||
if (inst->servers[i] &&
|
||||
SERVER_IS_RUNNING(inst->servers[i]->server) &&
|
||||
(inst->servers[i]->server->status & inst->bitmask & inst->bitvalue))
|
||||
{
|
||||
if (master_host) {
|
||||
if (inst->servers[i] == master_host && (inst->bitvalue & SERVER_SLAVE)) {
|
||||
if (master_host)
|
||||
{
|
||||
if (inst->servers[i] == master_host && (inst->bitvalue & SERVER_SLAVE))
|
||||
{
|
||||
/* skip root Master here, as it could also be slave of an external server
|
||||
* that is not in the configuration.
|
||||
* Intermediate masters (Relay Servers) are also slave and will be selected
|
||||
@ -467,7 +471,8 @@ BACKEND *master_host = NULL;
|
||||
|
||||
continue;
|
||||
}
|
||||
if (inst->servers[i] == master_host && (inst->bitvalue & SERVER_MASTER)) {
|
||||
if (inst->servers[i] == master_host && (inst->bitvalue & SERVER_MASTER))
|
||||
{
|
||||
/* If option is "master" return only the root Master as there
|
||||
* could be intermediate masters (Relay Servers)
|
||||
* and they must not be selected.
|
||||
@ -476,12 +481,15 @@ BACKEND *master_host = NULL;
|
||||
candidate = master_host;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
}
|
||||
else
|
||||
{
|
||||
/* master_host is NULL, no master server.
|
||||
* If requested router_option is 'master'
|
||||
* candidate wll be NULL.
|
||||
*/
|
||||
if (inst->bitvalue & SERVER_MASTER) {
|
||||
if (inst->bitvalue & SERVER_MASTER)
|
||||
{
|
||||
candidate = NULL;
|
||||
break;
|
||||
}
|
||||
@ -493,18 +501,18 @@ BACKEND *master_host = NULL;
|
||||
{
|
||||
candidate = inst->servers[i];
|
||||
}
|
||||
else if ((inst->servers[i]->current_connection_count
|
||||
else if (((inst->servers[i]->current_connection_count + 1)
|
||||
* 1000) / inst->servers[i]->weight <
|
||||
(candidate->current_connection_count *
|
||||
((candidate->current_connection_count + 1) *
|
||||
1000) / candidate->weight)
|
||||
{
|
||||
/* This running server has fewer
|
||||
connections, set it as a new candidate */
|
||||
candidate = inst->servers[i];
|
||||
}
|
||||
else if ((inst->servers[i]->current_connection_count
|
||||
else if (((inst->servers[i]->current_connection_count + 1)
|
||||
* 1000) / inst->servers[i]->weight ==
|
||||
(candidate->current_connection_count *
|
||||
((candidate->current_connection_count + 1) *
|
||||
1000) / candidate->weight &&
|
||||
inst->servers[i]->server->stats.n_connections <
|
||||
candidate->server->stats.n_connections)
|
||||
@ -522,10 +530,14 @@ BACKEND *master_host = NULL;
|
||||
* With router_option=slave a master_host could be set, so route traffic there.
|
||||
* Otherwise, just clean up and return NULL
|
||||
*/
|
||||
if (!candidate) {
|
||||
if (master_host) {
|
||||
if (!candidate)
|
||||
{
|
||||
if (master_host)
|
||||
{
|
||||
candidate = master_host;
|
||||
} else {
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Failed to create new routing session. "
|
||||
"Couldn't find eligible candidate server. Freeing "
|
||||
"allocated resources.");
|
||||
@ -582,7 +594,8 @@ BACKEND *master_host = NULL;
|
||||
"Connections : %d",
|
||||
candidate->server->unique_name,
|
||||
candidate->current_connection_count);
|
||||
return (void *)client_rses;
|
||||
|
||||
return(void *) client_rses;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -602,13 +615,11 @@ BACKEND *master_host = NULL;
|
||||
* @details (write detailed description here)
|
||||
*
|
||||
*/
|
||||
static void freeSession(
|
||||
ROUTER* router_instance,
|
||||
void* router_client_ses)
|
||||
static void freeSession(ROUTER* router_instance, void* router_client_ses)
|
||||
{
|
||||
ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_instance;
|
||||
ROUTER_INSTANCE* router = (ROUTER_INSTANCE *) router_instance;
|
||||
ROUTER_CLIENT_SES* router_cli_ses =
|
||||
(ROUTER_CLIENT_SES *)router_client_ses;
|
||||
(ROUTER_CLIENT_SES *) router_client_ses;
|
||||
int prev_val;
|
||||
|
||||
prev_val = atomic_add(&router_cli_ses->backend->current_connection_count, -1);
|
||||
@ -616,16 +627,21 @@ static void freeSession(
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
if (router->connections == router_cli_ses) {
|
||||
if (router->connections == router_cli_ses)
|
||||
{
|
||||
router->connections = router_cli_ses->next;
|
||||
} else {
|
||||
}
|
||||
else
|
||||
{
|
||||
ROUTER_CLIENT_SES *ptr = router->connections;
|
||||
|
||||
while (ptr != NULL && ptr->next != router_cli_ses) {
|
||||
while (ptr != NULL && ptr->next != router_cli_ses)
|
||||
{
|
||||
ptr = ptr->next;
|
||||
}
|
||||
|
||||
if (ptr != NULL) {
|
||||
if (ptr != NULL)
|
||||
{
|
||||
ptr->next = router_cli_ses->next;
|
||||
}
|
||||
}
|
||||
@ -637,12 +653,11 @@ static void freeSession(
|
||||
router_cli_ses,
|
||||
router,
|
||||
router_cli_ses->backend->server->port,
|
||||
prev_val-1);
|
||||
prev_val - 1);
|
||||
|
||||
free(router_cli_ses);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Close a session with the router, this is the mechanism
|
||||
* by which a router may cleanup data structure etc.
|
||||
@ -653,8 +668,8 @@ static void freeSession(
|
||||
static void
|
||||
closeSession(ROUTER *instance, void *router_session)
|
||||
{
|
||||
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||
DCB* backend_dcb;
|
||||
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *) router_session;
|
||||
DCB* backend_dcb;
|
||||
|
||||
CHK_CLIENT_RSES(router_cli_ses);
|
||||
/**
|
||||
@ -673,7 +688,8 @@ DCB* backend_dcb;
|
||||
/**
|
||||
* Close the backend server connection
|
||||
*/
|
||||
if (backend_dcb != NULL) {
|
||||
if (backend_dcb != NULL)
|
||||
{
|
||||
CHK_DCB(backend_dcb);
|
||||
dcb_close(backend_dcb);
|
||||
}
|
||||
@ -693,8 +709,8 @@ DCB* backend_dcb;
|
||||
static int
|
||||
routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
|
||||
{
|
||||
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance;
|
||||
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *) instance;
|
||||
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *) router_session;
|
||||
uint8_t *payload = GWBUF_DATA(queue);
|
||||
int mysql_command;
|
||||
int rc;
|
||||
@ -729,21 +745,22 @@ routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
|
||||
{
|
||||
MXS_ERROR("Failed to route MySQL command %d to backend "
|
||||
"server.%s",
|
||||
mysql_command,rses_is_closed ? " Session is closed." : "");
|
||||
mysql_command, rses_is_closed ? " Session is closed." : "");
|
||||
rc = 0;
|
||||
while((queue = GWBUF_CONSUME_ALL(queue)) != NULL);
|
||||
while ((queue = GWBUF_CONSUME_ALL(queue)) != NULL)
|
||||
{
|
||||
;
|
||||
}
|
||||
goto return_rc;
|
||||
|
||||
}
|
||||
|
||||
char* trc = NULL;
|
||||
|
||||
switch(mysql_command) {
|
||||
switch (mysql_command)
|
||||
{
|
||||
case MYSQL_COM_CHANGE_USER:
|
||||
rc = backend_dcb->func.auth(
|
||||
backend_dcb,
|
||||
NULL,
|
||||
backend_dcb->session,
|
||||
rc = backend_dcb->func.auth(backend_dcb, NULL, backend_dcb->session,
|
||||
queue);
|
||||
break;
|
||||
case MYSQL_COM_QUERY:
|
||||
@ -759,10 +776,12 @@ routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
|
||||
MXS_INFO("Routed [%s] to '%s'%s%s",
|
||||
STRPACKETTYPE(mysql_command),
|
||||
backend_dcb->server->unique_name,
|
||||
trc?": ":".",
|
||||
trc?trc:"");
|
||||
trc ? ": " : ".",
|
||||
trc ? trc : "");
|
||||
free(trc);
|
||||
|
||||
return_rc:
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -775,11 +794,11 @@ return_rc:
|
||||
static void
|
||||
diagnostics(ROUTER *router, DCB *dcb)
|
||||
{
|
||||
ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)router;
|
||||
ROUTER_CLIENT_SES *session;
|
||||
int i = 0;
|
||||
BACKEND *backend;
|
||||
char *weightby;
|
||||
ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *) router;
|
||||
ROUTER_CLIENT_SES *session;
|
||||
int i = 0;
|
||||
BACKEND *backend;
|
||||
char *weightby;
|
||||
|
||||
spinlock_acquire(&router_inst->lock);
|
||||
session = router_inst->connections;
|
||||
@ -808,7 +827,7 @@ char *weightby;
|
||||
backend = router_inst->servers[i];
|
||||
dcb_printf(dcb, "\t\t%-20s %3.1f%% %d\n",
|
||||
backend->server->unique_name,
|
||||
(float)backend->weight / 10,
|
||||
(float) backend->weight / 10,
|
||||
backend->current_connection_count);
|
||||
}
|
||||
|
||||
@ -826,11 +845,7 @@ char *weightby;
|
||||
* @param queue The GWBUF with reply data
|
||||
*/
|
||||
static void
|
||||
clientReply(
|
||||
ROUTER *instance,
|
||||
void *router_session,
|
||||
GWBUF *queue,
|
||||
DCB *backend_dcb)
|
||||
clientReply(ROUTER *instance, void *router_session, GWBUF *queue, DCB *backend_dcb)
|
||||
{
|
||||
ss_dassert(backend_dcb->session->client != NULL);
|
||||
SESSION_ROUTE_REPLY(backend_dcb->session, queue);
|
||||
@ -849,19 +864,14 @@ clientReply(
|
||||
* @param succp Result of action: true if router can continue
|
||||
*
|
||||
*/
|
||||
static void handleError(
|
||||
ROUTER *instance,
|
||||
void *router_session,
|
||||
GWBUF *errbuf,
|
||||
DCB *problem_dcb,
|
||||
error_action_t action,
|
||||
bool *succp)
|
||||
static void handleError(ROUTER *instance, void *router_session, GWBUF *errbuf,
|
||||
DCB *problem_dcb, error_action_t action, bool *succp)
|
||||
|
||||
{
|
||||
DCB *client_dcb;
|
||||
SESSION *session = problem_dcb->session;
|
||||
session_state_t sesstate;
|
||||
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *) router_session;
|
||||
|
||||
/** Don't handle same error twice on same DCB */
|
||||
if (problem_dcb->dcb_errhandle_called)
|
||||
@ -904,6 +914,7 @@ static void handleError(
|
||||
}
|
||||
|
||||
/** to be inline'd */
|
||||
|
||||
/**
|
||||
* @node Acquires lock to router client session if it is not closed.
|
||||
*
|
||||
@ -919,18 +930,19 @@ static void handleError(
|
||||
* @details (write detailed description here)
|
||||
*
|
||||
*/
|
||||
static bool rses_begin_locked_router_action(
|
||||
ROUTER_CLIENT_SES* rses)
|
||||
static bool rses_begin_locked_router_action(ROUTER_CLIENT_SES* rses)
|
||||
{
|
||||
bool succp = false;
|
||||
|
||||
CHK_CLIENT_RSES(rses);
|
||||
|
||||
if (rses->rses_closed) {
|
||||
if (rses->rses_closed)
|
||||
{
|
||||
goto return_succp;
|
||||
}
|
||||
spinlock_acquire(&rses->rses_lock);
|
||||
if (rses->rses_closed) {
|
||||
if (rses->rses_closed)
|
||||
{
|
||||
spinlock_release(&rses->rses_lock);
|
||||
goto return_succp;
|
||||
}
|
||||
@ -941,6 +953,7 @@ return_succp:
|
||||
}
|
||||
|
||||
/** to be inline'd */
|
||||
|
||||
/**
|
||||
* @node Releases router client session lock.
|
||||
*
|
||||
@ -954,14 +967,12 @@ return_succp:
|
||||
* @details (write detailed description here)
|
||||
*
|
||||
*/
|
||||
static void rses_end_locked_router_action(
|
||||
ROUTER_CLIENT_SES* rses)
|
||||
static void rses_end_locked_router_action(ROUTER_CLIENT_SES* rses)
|
||||
{
|
||||
CHK_CLIENT_RSES(rses);
|
||||
spinlock_release(&rses->rses_lock);
|
||||
}
|
||||
|
||||
|
||||
static int getCapabilities()
|
||||
{
|
||||
return RCAP_TYPE_PACKET_INPUT;
|
||||
@ -980,31 +991,35 @@ static int getCapabilities()
|
||||
*
|
||||
*/
|
||||
|
||||
static BACKEND *get_root_master(BACKEND **servers) {
|
||||
static BACKEND *get_root_master(BACKEND **servers)
|
||||
{
|
||||
int i = 0;
|
||||
BACKEND *master_host = NULL;
|
||||
|
||||
for (i = 0; servers[i]; i++) {
|
||||
if (servers[i] && (servers[i]->server->status & (SERVER_MASTER|SERVER_MAINT)) == SERVER_MASTER) {
|
||||
if (master_host && servers[i]->server->depth < master_host->server->depth) {
|
||||
master_host = servers[i];
|
||||
} else {
|
||||
if (master_host == NULL) {
|
||||
for (i = 0; servers[i]; i++)
|
||||
{
|
||||
if (servers[i] && (servers[i]->server->status & (SERVER_MASTER | SERVER_MAINT)) == SERVER_MASTER)
|
||||
{
|
||||
if (master_host && servers[i]->server->depth < master_host->server->depth)
|
||||
{
|
||||
master_host = servers[i];
|
||||
}
|
||||
else if (master_host == NULL)
|
||||
{
|
||||
master_host = servers[i];
|
||||
}
|
||||
}
|
||||
}
|
||||
return master_host;
|
||||
}
|
||||
|
||||
static int handle_state_switch(DCB* dcb,DCB_REASON reason, void * routersession)
|
||||
static int handle_state_switch(DCB* dcb, DCB_REASON reason, void * routersession)
|
||||
{
|
||||
ss_dassert(dcb != NULL);
|
||||
SESSION* session = dcb->session;
|
||||
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES*)routersession;
|
||||
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES*) routersession;
|
||||
SERVICE* service = session->service;
|
||||
ROUTER* router = (ROUTER *)service->router;
|
||||
ROUTER* router = (ROUTER *) service->router;
|
||||
|
||||
if (NULL == dcb->session->router_session && DCB_REASON_ERROR != reason)
|
||||
{
|
||||
@ -1014,7 +1029,7 @@ static int handle_state_switch(DCB* dcb,DCB_REASON reason, void * routersession)
|
||||
*/
|
||||
return 0;
|
||||
}
|
||||
switch(reason)
|
||||
switch (reason)
|
||||
{
|
||||
case DCB_REASON_CLOSE:
|
||||
dcb->func.close(dcb);
|
||||
|
@ -4670,7 +4670,7 @@ static void handleError (
|
||||
ROUTER* instance,
|
||||
void* router_session,
|
||||
GWBUF* errmsgbuf,
|
||||
DCB* backend_dcb,
|
||||
DCB* problem_dcb,
|
||||
error_action_t action,
|
||||
bool* succp)
|
||||
{
|
||||
@ -4678,10 +4678,10 @@ static void handleError (
|
||||
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
||||
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session;
|
||||
|
||||
CHK_DCB(backend_dcb);
|
||||
CHK_DCB(problem_dcb);
|
||||
|
||||
/** Don't handle same error twice on same DCB */
|
||||
if (backend_dcb->dcb_errhandle_called)
|
||||
if (problem_dcb->dcb_errhandle_called)
|
||||
{
|
||||
/** we optimistically assume that previous call succeed */
|
||||
/*
|
||||
@ -4693,13 +4693,17 @@ static void handleError (
|
||||
}
|
||||
else
|
||||
{
|
||||
backend_dcb->dcb_errhandle_called = true;
|
||||
problem_dcb->dcb_errhandle_called = true;
|
||||
}
|
||||
session = backend_dcb->session;
|
||||
session = problem_dcb->session;
|
||||
|
||||
if (session == NULL || rses == NULL)
|
||||
{
|
||||
*succp = false;
|
||||
}
|
||||
else if (dcb_isclient(problem_dcb))
|
||||
{
|
||||
*succp = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -4721,11 +4725,11 @@ static void handleError (
|
||||
* If master has lost its Master status error can't be
|
||||
* handled so that session could continue.
|
||||
*/
|
||||
if (rses->rses_master_ref->bref_dcb == backend_dcb &&
|
||||
if (rses->rses_master_ref->bref_dcb == problem_dcb &&
|
||||
!SERVER_IS_MASTER(srv))
|
||||
{
|
||||
backend_ref_t* bref;
|
||||
bref = get_bref_from_dcb(rses, backend_dcb);
|
||||
bref = get_bref_from_dcb(rses, problem_dcb);
|
||||
if (bref != NULL)
|
||||
{
|
||||
CHK_BACKEND_REF(bref);
|
||||
@ -4739,7 +4743,7 @@ static void handleError (
|
||||
"corresponding backend ref.",
|
||||
srv->name,
|
||||
srv->port);
|
||||
dcb_close(backend_dcb);
|
||||
dcb_close(problem_dcb);
|
||||
}
|
||||
if (!srv->master_err_is_logged)
|
||||
{
|
||||
@ -4761,7 +4765,7 @@ static void handleError (
|
||||
*/
|
||||
*succp = handle_error_new_connection(inst,
|
||||
&rses,
|
||||
backend_dcb,
|
||||
problem_dcb,
|
||||
errmsgbuf);
|
||||
}
|
||||
/* Free the lock if rses still exists */
|
||||
@ -4773,7 +4777,7 @@ static void handleError (
|
||||
{
|
||||
handle_error_reply_client(session,
|
||||
rses,
|
||||
backend_dcb,
|
||||
problem_dcb,
|
||||
errmsgbuf);
|
||||
*succp = false; /*< no new backend servers were made available */
|
||||
break;
|
||||
@ -4784,7 +4788,7 @@ static void handleError (
|
||||
break;
|
||||
}
|
||||
}
|
||||
dcb_close(backend_dcb);
|
||||
dcb_close(problem_dcb);
|
||||
}
|
||||
|
||||
|
||||
|
@ -4018,7 +4018,7 @@ static void handleError (
|
||||
ROUTER* instance,
|
||||
void* router_session,
|
||||
GWBUF* errmsgbuf,
|
||||
DCB* backend_dcb,
|
||||
DCB* problem_dcb,
|
||||
error_action_t action,
|
||||
bool* succp)
|
||||
{
|
||||
@ -4026,10 +4026,10 @@ static void handleError (
|
||||
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
||||
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session;
|
||||
|
||||
CHK_DCB(backend_dcb);
|
||||
CHK_DCB(problem_dcb);
|
||||
|
||||
/** Don't handle same error twice on same DCB */
|
||||
if (backend_dcb->dcb_errhandle_called)
|
||||
if (problem_dcb->dcb_errhandle_called)
|
||||
{
|
||||
/** we optimistically assume that previous call succeed */
|
||||
*succp = true;
|
||||
@ -4037,14 +4037,18 @@ static void handleError (
|
||||
}
|
||||
else
|
||||
{
|
||||
backend_dcb->dcb_errhandle_called = true;
|
||||
problem_dcb->dcb_errhandle_called = true;
|
||||
}
|
||||
session = backend_dcb->session;
|
||||
session = problem_dcb->session;
|
||||
|
||||
if (session == NULL || rses == NULL)
|
||||
{
|
||||
*succp = false;
|
||||
}
|
||||
else if (dcb_isclient(problem_dcb))
|
||||
{
|
||||
*succp = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
CHK_SESSION(session);
|
||||
@ -4064,7 +4068,7 @@ static void handleError (
|
||||
*/
|
||||
*succp = handle_error_new_connection(inst,
|
||||
rses,
|
||||
backend_dcb,
|
||||
problem_dcb,
|
||||
errmsgbuf);
|
||||
rses_end_locked_router_action(rses);
|
||||
break;
|
||||
@ -4074,7 +4078,7 @@ static void handleError (
|
||||
{
|
||||
handle_error_reply_client(session,
|
||||
rses,
|
||||
backend_dcb,
|
||||
problem_dcb,
|
||||
errmsgbuf);
|
||||
*succp = false; /*< no new backend servers were made available */
|
||||
break;
|
||||
@ -4085,7 +4089,7 @@ static void handleError (
|
||||
break;
|
||||
}
|
||||
}
|
||||
dcb_close(backend_dcb);
|
||||
dcb_close(problem_dcb);
|
||||
}
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user