In dcb.c:dcb_close DCB is removed either before or after the call dcb->func.close. Since mysql backend protocol sends COM_QUIT and thus, writes to backend DCB, it is kept in DCB_STATE_POLLING until the write is completed.

dcb.h: define ERRHAND temporarily since changes are still behind that macro
Defined two error handling actions in router.h: ERRACT_NEW_COMMECTION and ERRACT_REPLY_CLIENT.
Failed database is logged at expanding frequence to error and to message log due changes in mysql_mon.c. Added two new members in MONITOR_SERVERS: mon_err_count, and mon_prev_status so that each backend can be treated individually.

Error handling: if mysql_backend.c:dcb_read fails, router's handleError is called instead of closing session.
	If mysql_client.c:SESSION_ROUTE_QUERY fails router's handleError is called instead of sending error to client.

	readwritesplit.c:select_connect_backend_servers is modified so that in can be called during active router session. When called, it attempts to find one master and maximum number of configured slaves in correct state if necessary.
	When handleError needs to replace failed unit it now calls select_connect_backend_servers.
This commit is contained in:
VilhoRaatikka
2014-06-08 19:36:12 +03:00
parent 916b763685
commit 889bdd4f8c
8 changed files with 298 additions and 121 deletions

View File

@ -1027,6 +1027,9 @@ void
dcb_close(DCB *dcb) dcb_close(DCB *dcb)
{ {
int rc; int rc;
#if defined(ERRHANDLE)
bool isclient;
#endif
CHK_DCB(dcb); CHK_DCB(dcb);
/*< /*<
@ -1044,15 +1047,21 @@ dcb_close(DCB *dcb)
dcb->state == DCB_STATE_NOPOLLING || dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE); dcb->state == DCB_STATE_ZOMBIE);
/*<
* Stop dcb's listening and modify state accordingly.
*/
rc = poll_remove_dcb(dcb);
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE);
#if defined(ERRHANDLE) #if defined(ERRHANDLE)
isclient = dcb_isclient(dcb);
if (isclient)
{
/*<
* Stop dcb's listening and modify state accordingly.
*/
rc = poll_remove_dcb(dcb);
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE);
}
/** /**
* close protocol and router session * close protocol and router session
*/ */
@ -1060,6 +1069,25 @@ dcb_close(DCB *dcb)
{ {
dcb->func.close(dcb); dcb->func.close(dcb);
} }
if (!isclient)
{
/*<
* Stop dcb's listening and modify state accordingly.
*/
rc = poll_remove_dcb(dcb);
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE);
}
#else
/*<
* Stop dcb's listening and modify state accordingly.
*/
rc = poll_remove_dcb(dcb);
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE);
#endif #endif
dcb_call_callback(dcb, DCB_REASON_CLOSE); dcb_call_callback(dcb, DCB_REASON_CLOSE);

View File

@ -24,7 +24,7 @@
#include <skygw_utils.h> #include <skygw_utils.h>
#include <netinet/in.h> #include <netinet/in.h>
// #define ERRHANDLE #define ERRHANDLE
struct session; struct session;
struct server; struct server;

View File

@ -97,4 +97,10 @@ typedef enum router_capability_t {
RCAP_TYPE_PACKET_INPUT = (1 << 1) RCAP_TYPE_PACKET_INPUT = (1 << 1)
} router_capability_t; } router_capability_t;
typedef enum error_action {
ERRACT_NEW_CONNECTION = 0x001,
ERRACT_REPLY_CLIENT = 0x002
} error_action_t;
#endif #endif

View File

@ -73,6 +73,8 @@ static void diagnostics(DCB *, void *);
static void setInterval(void *, unsigned long); static void setInterval(void *, unsigned long);
static void defaultId(void *, unsigned long); static void defaultId(void *, unsigned long);
static void replicationHeartbeat(void *, int); static void replicationHeartbeat(void *, int);
static bool mon_status_changed(MONITOR_SERVERS* mon_srv);
static bool mon_print_fail_status(MONITOR_SERVERS* mon_srv);
static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUser, diagnostics, setInterval, defaultId, replicationHeartbeat }; static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUser, diagnostics, setInterval, defaultId, replicationHeartbeat };
@ -180,7 +182,10 @@ MONITOR_SERVERS *ptr, *db;
db->server = server; db->server = server;
db->con = NULL; db->con = NULL;
db->next = NULL; db->next = NULL;
db->mon_err_count = 0;
db->mon_prev_status = 0;
spinlock_acquire(&handle->lock); spinlock_acquire(&handle->lock);
if (handle->databases == NULL) if (handle->databases == NULL)
handle->databases = db; handle->databases = db;
else else
@ -310,15 +315,17 @@ monitorDatabase(MYSQL_MONITOR *handle, MONITOR_SERVERS *database)
MYSQL_ROW row; MYSQL_ROW row;
MYSQL_RES *result; MYSQL_RES *result;
int num_fields; int num_fields;
int ismaster = 0, isslave = 0; int ismaster = 0;
char *uname = handle->defaultUser, *passwd = handle->defaultPasswd; int isslave = 0;
char *uname = handle->defaultUser;
char *passwd = handle->defaultPasswd;
unsigned long int server_version = 0; unsigned long int server_version = 0;
char *server_string; char *server_string;
unsigned long id = handle->id; unsigned long id = handle->id;
int replication_heartbeat = handle->replicationHeartbeat; int replication_heartbeat = handle->replicationHeartbeat;
static int conn_err_count; static int conn_err_count;
if (database->server->monuser != NULL) if (database->server->monuser != NULL)
{ {
uname = database->server->monuser; uname = database->server->monuser;
passwd = database->server->monpw; passwd = database->server->monpw;
@ -331,6 +338,9 @@ static int conn_err_count;
if (SERVER_IN_MAINT(database->server)) if (SERVER_IN_MAINT(database->server))
return; return;
/** Store prevous status */
database->mon_prev_status = database->server->status;
if (database->con == NULL || mysql_ping(database->con) != 0) if (database->con == NULL || mysql_ping(database->con) != 0)
{ {
char *dpwd = decryptPassword(passwd); char *dpwd = decryptPassword(passwd);
@ -338,6 +348,7 @@ static int conn_err_count;
int read_timeout = 1; int read_timeout = 1;
database->con = mysql_init(NULL); database->con = mysql_init(NULL);
rc = mysql_options(database->con, MYSQL_OPT_READ_TIMEOUT, (void *)&read_timeout); rc = mysql_options(database->con, MYSQL_OPT_READ_TIMEOUT, (void *)&read_timeout);
if (mysql_real_connect(database->con, if (mysql_real_connect(database->con,
@ -349,7 +360,9 @@ static int conn_err_count;
NULL, NULL,
0) == NULL) 0) == NULL)
{ {
if (conn_err_count%10 == 0) free(dpwd);
if (mon_print_fail_status(database))
{ {
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
@ -359,17 +372,15 @@ static int conn_err_count;
database->server->port, database->server->port,
mysql_error(database->con)))); mysql_error(database->con))));
} }
conn_err_count += 1; /** Store current status */
free(dpwd);
server_clear_status(database->server, SERVER_RUNNING); server_clear_status(database->server, SERVER_RUNNING);
return; return;
} }
free(dpwd); free(dpwd);
} }
/** Store current status */
/* If we get this far then we have a working connection */ server_set_status(database->server, SERVER_RUNNING);
server_set_status(database->server, SERVER_RUNNING);
/* get server version from current server */ /* get server version from current server */
server_version = mysql_get_server_version(database->con); server_version = mysql_get_server_version(database->con);
@ -629,7 +640,7 @@ static int conn_err_count;
} }
} }
} }
/** Store current status */
if (ismaster) if (ismaster)
{ {
server_set_status(database->server, SERVER_MASTER); server_set_status(database->server, SERVER_MASTER);
@ -657,7 +668,6 @@ monitorMain(void *arg)
{ {
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
MONITOR_SERVERS *ptr; MONITOR_SERVERS *ptr;
static int err_count;
if (mysql_thread_init()) if (mysql_thread_init())
{ {
@ -680,13 +690,10 @@ static int err_count;
ptr = handle->databases; ptr = handle->databases;
while (ptr) while (ptr)
{ {
unsigned int prev_status = ptr->server->status;
monitorDatabase(handle, ptr); monitorDatabase(handle, ptr);
if (ptr->server->status != prev_status || if (mon_status_changed(ptr) ||
(SERVER_IS_DOWN(ptr->server) && mon_print_fail_status(ptr))
err_count%10 == 0))
{ {
LOGIF(LM, (skygw_log_write_flush( LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE, LOGFILE_MESSAGE,
@ -697,7 +704,13 @@ static int err_count;
} }
if (SERVER_IS_DOWN(ptr->server)) if (SERVER_IS_DOWN(ptr->server))
{ {
err_count += 1; /** Increase this server'e error count */
ptr->mon_err_count += 1;
}
else
{
/** Reset this server's error count */
ptr->mon_err_count = 0;
} }
ptr = ptr->next; ptr = ptr->next;
} }
@ -743,3 +756,39 @@ replicationHeartbeat(void *arg, int replicationHeartbeat)
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
memcpy(&handle->replicationHeartbeat, &replicationHeartbeat, sizeof(int)); memcpy(&handle->replicationHeartbeat, &replicationHeartbeat, sizeof(int));
} }
static bool mon_status_changed(
MONITOR_SERVERS* mon_srv)
{
bool succp;
if (mon_srv->mon_prev_status != mon_srv->server->status)
{
succp = true;
}
else
{
succp = false;
}
return succp;
}
static bool mon_print_fail_status(
MONITOR_SERVERS* mon_srv)
{
bool succp;
int errcount = mon_srv->mon_err_count;
uint8_t modval;
modval = 1<<(MIN(errcount/10, 7));
if (SERVER_IS_DOWN(mon_srv->server) && errcount%modval == 0)
{
succp = true;
}
else
{
succp = false;
}
return succp;
}

View File

@ -42,6 +42,8 @@
typedef struct monitor_servers { typedef struct monitor_servers {
SERVER *server; /**< The server being monitored */ SERVER *server; /**< The server being monitored */
MYSQL *con; /**< The MySQL connection */ MYSQL *con; /**< The MySQL connection */
int mon_err_count;
unsigned int mon_prev_status;
struct monitor_servers struct monitor_servers
*next; /**< The next server in the list */ *next; /**< The next server in the list */
} MONITOR_SERVERS; } MONITOR_SERVERS;

View File

@ -65,7 +65,9 @@ static int gw_backend_hangup(DCB *dcb);
static int backend_write_delayqueue(DCB *dcb); static int backend_write_delayqueue(DCB *dcb);
static void backend_set_delayqueue(DCB *dcb, GWBUF *queue); static void backend_set_delayqueue(DCB *dcb, GWBUF *queue);
static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue); static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue);
static int gw_session(DCB *backend_dcb, void *data); #if defined(NOT_USED)
static int gw_session(DCB *backend_dcb, void *data);
#endif
static MYSQL_session* gw_get_shared_session_auth_info(DCB* dcb); static MYSQL_session* gw_get_shared_session_auth_info(DCB* dcb);
static GWPROTOCOL MyObject = { static GWPROTOCOL MyObject = {
@ -401,8 +403,12 @@ static int gw_read_backend_event(DCB *dcb) {
SESSION *session = dcb->session; SESSION *session = dcb->session;
CHK_SESSION(session); CHK_SESSION(session);
/* read available backend data */ router = session->service->router;
rc = dcb_read(dcb, &writebuf); router_instance = session->service->router_instance;
rsession = session->router_session;
/* read available backend data */
rc = dcb_read(dcb, &writebuf);
if (rc < 0) { if (rc < 0) {
/*< vraa : errorHandle */ /*< vraa : errorHandle */
@ -412,7 +418,24 @@ static int gw_read_backend_event(DCB *dcb) {
* dcb from getting hanged. * dcb from getting hanged.
*/ */
#if defined(ERRHANDLE) #if defined(ERRHANDLE)
dcb_close(dcb); bool succp;
/**
* - send error for client
* - mark failed backend BREF_NOT_USED
* - go through all servers and select one according to
* the criteria that user specified in the beginning.
*/
router->handleError(router_instance,
rsession,
"Read from backend failed.",
dcb,
ERRACT_NEW_CONNECTION,
&succp);
if (!succp)
{
dcb_close(dcb);
}
#else #else
(dcb->func).close(dcb); (dcb->func).close(dcb);
#endif #endif
@ -424,9 +447,6 @@ static int gw_read_backend_event(DCB *dcb) {
rc = 0; rc = 0;
goto return_rc; goto return_rc;
} }
router = session->service->router;
router_instance = session->service->router_instance;
rsession = session->router_session;
/* Note the gwbuf doesn't have here a valid queue->command /* Note the gwbuf doesn't have here a valid queue->command
* descriptions as it is a fresh new one! * descriptions as it is a fresh new one!
@ -671,6 +691,9 @@ static int gw_error_backend_event(DCB *dcb) {
router = session->service->router; router = session->service->router;
router_instance = session->service->router_instance; router_instance = session->service->router_instance;
#if defined(ERRHANDLE2)
router->handleError();
#else
if (dcb->state != DCB_STATE_POLLING) { if (dcb->state != DCB_STATE_POLLING) {
/*< vraa : errorHandle */ /*< vraa : errorHandle */
/*< /*<
@ -720,6 +743,7 @@ static int gw_error_backend_event(DCB *dcb) {
router->closeSession(router_instance, rsession); router->closeSession(router_instance, rsession);
} }
#endif
return rc; return rc;
} }

View File

@ -804,7 +804,6 @@ int gw_read_client_event(DCB* dcb) {
dcb_close(dcb); dcb_close(dcb);
#else #else
SESSION_ROUTE_QUERY(session, read_buffer); SESSION_ROUTE_QUERY(session, read_buffer);
// router->routeQuery(router_instance, rsession, read_buffer);
LOGIF(LD, (skygw_log_write_flush( LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG, LOGFILE_DEBUG,
"%lu [gw_read_client_event] Routed COM_QUIT to " "%lu [gw_read_client_event] Routed COM_QUIT to "
@ -824,6 +823,7 @@ int gw_read_client_event(DCB* dcb) {
* to router. * to router.
*/ */
rc = route_by_statement(session, read_buffer); rc = route_by_statement(session, read_buffer);
if (read_buffer != NULL) if (read_buffer != NULL)
{ {
/** add incomplete mysql packet to read queue */ /** add incomplete mysql packet to read queue */
@ -840,7 +840,7 @@ int gw_read_client_event(DCB* dcb) {
if (rc == 1) { if (rc == 1) {
rc = 0; /**< here '0' means success */ rc = 0; /**< here '0' means success */
} else { } else {
#if defined(ERRHANDLE2) #if defined(ERRHANDLE)
bool succp; bool succp;
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
@ -848,40 +848,28 @@ int gw_read_client_event(DCB* dcb) {
"Error : Routing the query failed. " "Error : Routing the query failed. "
"Reselecting backends."))); "Reselecting backends.")));
/**
* Decide whether close router and its
* connections or just send an error to client
*/
router->handleError(router_instance, router->handleError(router_instance,
rsession, rsession,
"Query routing failed. " "Write to backend failed.",
"Query execution aborted. " dcb,
"Reselecting backend.", ERRACT_NEW_CONNECTION,
NULL,
ERRACT_RELECT_BACKENDS,
&succp); &succp);
if (!succp) if (!succp)
{ {
router->handleError(router_instance,
rsession,
"Connection to "
"backend lost.",
NULL,
ERRACT_CLOSE_RSES,
NULL);
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Reselecting backend " "Error : Reselecting backend "
"servers failed."))); "servers failed.")));
dcb_close(dcb);
} }
else
{ LOGIF(LT, (skygw_log_write_flush(
LOGIF(LT, (skygw_log_write_flush( LOGFILE_TRACE,
LOGFILE_TRACE, "Reselected backend servers.")));
"Reselected backend servers.")));
}
#else #else
mysql_send_custom_error(dcb, mysql_send_custom_error(dcb,
1, 1,

View File

@ -91,6 +91,8 @@ static void handleError(
bool* succp); bool* succp);
static void print_error_packet(ROUTER_CLIENT_SES* rses, GWBUF* buf, DCB* dcb); static void print_error_packet(ROUTER_CLIENT_SES* rses, GWBUF* buf, DCB* dcb);
static int router_get_servercount(ROUTER_INSTANCE* router);
static int rses_get_max_slavecount(ROUTER_CLIENT_SES* rses, int router_nservers);
static uint8_t getCapabilities (ROUTER* inst, void* router_session); static uint8_t getCapabilities (ROUTER* inst, void* router_session);
@ -425,14 +427,12 @@ static void* newSession(
SESSION* session) SESSION* session)
{ {
backend_ref_t* backend_ref; /*< array of backend references (DCB,BACKEND,cursor) */ backend_ref_t* backend_ref; /*< array of backend references (DCB,BACKEND,cursor) */
backend_ref_t* master_ref = NULL; /*< pointer to selected master */ backend_ref_t* master_ref = NULL; /*< pointer to selected master */
BACKEND** b;
ROUTER_CLIENT_SES* client_rses = NULL; ROUTER_CLIENT_SES* client_rses = NULL;
ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst; ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst;
bool succp; bool succp;
int router_nservers = 0; /*< # of servers in total */ int router_nservers = 0; /*< # of servers in total */
int max_nslaves; /*< max # of slaves used in this session */ int max_nslaves; /*< max # of slaves used in this session */
int conf_max_nslaves; /*< value from configuration file */
int i; int i;
const int min_nservers = 1; /*< hard-coded for now */ const int min_nservers = 1; /*< hard-coded for now */
static uint64_t router_client_ses_seq; /*< ID for client session */ static uint64_t router_client_ses_seq; /*< ID for client session */
@ -478,9 +478,7 @@ static void* newSession(
client_rses->rses_autocommit_enabled = true; client_rses->rses_autocommit_enabled = true;
client_rses->rses_transaction_active = false; client_rses->rses_transaction_active = false;
/** count servers */ router_nservers = router_get_servercount(router);
b = router->servers;
while (*(b++) != NULL) router_nservers++;
/** With too few servers session is not created */ /** With too few servers session is not created */
if (router_nservers < min_nservers || if (router_nservers < min_nservers ||
@ -558,6 +556,7 @@ static void* newSession(
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR;
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR;
#endif #endif
backend_ref[i].bref_state = BREF_NOT_USED;
backend_ref[i].bref_backend = router->servers[i]; backend_ref[i].bref_backend = router->servers[i];
/** store pointers to sescmd list to both cursors */ /** store pointers to sescmd list to both cursors */
backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses; backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses;
@ -565,22 +564,8 @@ static void* newSession(
backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property = backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property =
&client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD];
backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL; backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL;
} }
/** max_nslaves = rses_get_max_slavecount(client_rses, router_nservers);
* Find out the number of read backend servers.
* Depending on the configuration value type, either copy direct count
* of slave connections or calculate the count from percentage value.
*/
if (client_rses->rses_config.rw_max_slave_conn_count > 0)
{
conf_max_nslaves = client_rses->rses_config.rw_max_slave_conn_count;
}
else
{
conf_max_nslaves =
(router_nservers*client_rses->rses_config.rw_max_slave_conn_percent)/100;
}
max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves));
spinlock_init(&client_rses->rses_lock); spinlock_init(&client_rses->rses_lock);
client_rses->rses_backend_ref = backend_ref; client_rses->rses_backend_ref = backend_ref;
@ -770,7 +755,10 @@ static void freeSession(
return; return;
} }
/**
* Provide a pointer to a suitable backend dcb.
* Detect failures in server statuses and reselect backends if necessary.
*/
static bool get_dcb( static bool get_dcb(
DCB** p_dcb, DCB** p_dcb,
ROUTER_CLIENT_SES* rses, ROUTER_CLIENT_SES* rses,
@ -1401,10 +1389,10 @@ static bool select_connect_backend_servers(
select_criteria_t select_criteria, select_criteria_t select_criteria,
SESSION* session, SESSION* session,
ROUTER_INSTANCE* router) ROUTER_INSTANCE* router)
{ {
bool succp = true; bool succp = true;
bool master_found = false; bool master_found;
bool master_connected = false; bool master_connected;
int slaves_found = 0; int slaves_found = 0;
int slaves_connected = 0; int slaves_connected = 0;
int i; int i;
@ -1418,6 +1406,20 @@ static bool select_connect_backend_servers(
succp = false; succp = false;
goto return_succp; goto return_succp;
} }
/** Master is already chosen and connected. This is slave failure case */
if (*p_master_ref != NULL &&
(*p_master_ref)->bref_state == BREF_IN_USE)
{
master_found = true;
master_connected = true;
}
/** New session or master failure case */
else
{
master_found = false;
master_connected = false;
}
/** Check slave selection criteria and set compare function */ /** Check slave selection criteria and set compare function */
p = criteria_cmpfun[select_criteria]; p = criteria_cmpfun[select_criteria];
@ -1502,7 +1504,7 @@ static bool select_connect_backend_servers(
i++) i++)
{ {
BACKEND* b = backend_ref[i].bref_backend; BACKEND* b = backend_ref[i].bref_backend;
LOGIF(LT, (skygw_log_write( LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE, LOGFILE_TRACE,
"Examine server " "Examine server "
@ -1514,6 +1516,7 @@ static bool select_connect_backend_servers(
b->backend_conn_count, b->backend_conn_count,
router->bitmask))); router->bitmask)));
if (SERVER_IS_RUNNING(b->backend_server) && if (SERVER_IS_RUNNING(b->backend_server) &&
((b->backend_server->status & router->bitmask) == ((b->backend_server->status & router->bitmask) ==
router->bitvalue)) router->bitvalue))
@ -1522,33 +1525,43 @@ static bool select_connect_backend_servers(
SERVER_IS_SLAVE(b->backend_server)) SERVER_IS_SLAVE(b->backend_server))
{ {
slaves_found += 1; slaves_found += 1;
backend_ref[i].bref_dcb = dcb_connect(
b->backend_server,
session,
b->backend_server->protocol);
if (backend_ref[i].bref_dcb != NULL) /** Slave is already connected */
if (backend_ref[i].bref_state == BREF_IN_USE)
{ {
slaves_connected += 1; slaves_connected += 1;
backend_ref[i].bref_state = BREF_IN_USE;
/**
* Increase backend connection counter.
* Server's stats are _increased_ in
* dcb.c:dcb_alloc !
* But decreased in the calling function
* of dcb_close.
*/
atomic_add(&b->backend_conn_count, 1);
} }
/** New slave connection is taking place */
else else
{ {
LOGIF(LE, (skygw_log_write_flush( backend_ref[i].bref_dcb = dcb_connect(
LOGFILE_ERROR, b->backend_server,
"Error : Unable to establish " session,
"connection with slave %s:%d", b->backend_server->protocol);
b->backend_server->name,
b->backend_server->port))); if (backend_ref[i].bref_dcb != NULL)
/* handle connect error */ {
slaves_connected += 1;
backend_ref[i].bref_state = BREF_IN_USE;
/**
* Increase backend connection counter.
* Server's stats are _increased_ in
* dcb.c:dcb_alloc !
* But decreased in the calling function
* of dcb_close.
*/
atomic_add(&b->backend_conn_count, 1);
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to establish "
"connection with slave %s:%d",
b->backend_server->name,
b->backend_server->port)));
/* handle connect error */
}
} }
} }
else if (!master_connected && else if (!master_connected &&
@ -1584,18 +1597,18 @@ static bool select_connect_backend_servers(
/* handle connect error */ /* handle connect error */
} }
} }
else }
{ else
succp = false; {
LOGIF(LE, (skygw_log_write_flush( succp = false;
LOGFILE_ERROR, LOGIF(LE, (skygw_log_write_flush(
"Error : Unable to establish " LOGFILE_ERROR,
"connection with server %s:%d, %s", "Error : Unable to establish "
b->backend_server->name, "connection with server %s:%d, %s",
b->backend_server->port, b->backend_server->name,
STRSRVSTATUS(b->backend_server)))); b->backend_server->port,
/* handle connect error */ STRSRVSTATUS(b->backend_server))));
} /* handle connect error */
} }
} /*< for */ } /*< for */
@ -2496,12 +2509,40 @@ static void handleError (
int action, int action,
bool *succp) bool *succp)
{ {
DCB* client_dcb = NULL; DCB* client_dcb = NULL;
SESSION* session = backend_dcb->session; SESSION* session = backend_dcb->session;
ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session;
client_dcb = session->client; client_dcb = session->client;
CHK_DCB(client_dcb);
ss_dassert(client_dcb != NULL); switch (action) {
case ERRACT_NEW_CONNECTION:
{
int router_nservers;
int max_nslaves;
router_nservers = router_get_servercount(router);
max_nslaves = rses_get_max_slavecount(rses, router_nservers);
*succp = select_connect_backend_servers(
&rses->rses_master_ref,
rses->rses_backend_ref,
router_nservers,
max_nslaves,
rses->rses_config.rw_slave_select_criteria,
session,
router);
ss_dassert(*succp);
}
break;
default:
*succp = false;
break;
}
} }
static void print_error_packet( static void print_error_packet(
@ -2555,3 +2596,42 @@ static void print_error_packet(
while ((buf = gwbuf_consume(buf, GWBUF_LENGTH(buf))) != NULL); while ((buf = gwbuf_consume(buf, GWBUF_LENGTH(buf))) != NULL);
} }
} }
static int router_get_servercount(
ROUTER_INSTANCE* router)
{
int router_nservers = 0;
BACKEND** b = router->servers;
/** count servers */
while (*(b++) != NULL) router_nservers++;
return router_nservers;
}
/**
* Find out the number of read backend servers.
* Depending on the configuration value type, either copy direct count
* of slave connections or calculate the count from percentage value.
*/
static int rses_get_max_slavecount(
ROUTER_CLIENT_SES* rses,
int router_nservers)
{
int conf_max_nslaves;
int max_nslaves;
CHK_CLIENT_RSES(rses);
if (rses->rses_config.rw_max_slave_conn_count > 0)
{
conf_max_nslaves = rses->rses_config.rw_max_slave_conn_count;
}
else
{
conf_max_nslaves =
(router_nservers*rses->rses_config.rw_max_slave_conn_percent)/100;
}
max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves));
return max_nslaves;
}