Snapshot of failure tolerance changes.

Added a lot of logging to error, trace and message logs which should help the user to handle errors which can't be automatically resolved, like attempt to use nonexisting database.
This commit is contained in:
VilhoRaatikka
2014-06-06 23:32:04 +03:00
parent 7bca4e383f
commit 7e6cb7afc2
19 changed files with 518 additions and 169 deletions

View File

@ -116,9 +116,7 @@ skygw_query_type_t skygw_query_classifier_get_type(
query_str = const_cast<char*>(query);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [skygw_query_classifier_get_type] Query : \"%s\"",
pthread_self(),
query_str)));
"Query : \"%s\"", query_str)));
/** Get server handle */
mysql = mysql_init(NULL);

View File

@ -213,9 +213,9 @@ getUsers(SERVICE *service, struct users *users)
"Exiting.")));
return -1;
}
/*
* Attempt to connect to each database in the service in turn until
* we find one that we can connect to or until we run out of databases
/**
* Attempt to connect to one of the databases database or until we run
* out of databases
* to try
*/
server = service->databases;
@ -229,17 +229,6 @@ getUsers(SERVICE *service, struct users *users)
NULL,
0) == NULL))
{
if (server == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to connect to %s:%d, \"%s\"",
server->name,
server->port,
mysql_error(con))));
mysql_close(con);
return -1;
}
server = server->nextdb;
}
free(dpwd);

View File

@ -559,6 +559,7 @@ int rc;
dcb->fd = fd;
/** Copy status field to DCB */
dcb->dcb_server_status = server->status;
ss_debug(dcb->dcb_port = server->port;)
/*<
* backend_dcb is connected to backend server, and once backend_dcb
@ -937,6 +938,7 @@ int above_water;
above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0;
spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq)
{
int len;
@ -996,16 +998,17 @@ int above_water;
}
spinlock_release(&dcb->writeqlock);
atomic_add(&dcb->writeqlen, -n);
/* The write queue has drained, potentially need to call a callback function */
if (dcb->writeq == NULL)
dcb_call_callback(dcb, DCB_REASON_DRAINED);
if (above_water && dcb->writeqlen < dcb->low_water)
{
atomic_add(&dcb->stats.n_low_water, 1);
dcb_call_callback(dcb, DCB_REASON_LOW_WATER);
}
return n;
}
@ -1030,7 +1033,8 @@ dcb_close(DCB *dcb)
* dcb_close may be called for freshly created dcb, in which case
* it only needs to be freed.
*/
if (dcb->state == DCB_STATE_ALLOC) {
if (dcb->state == DCB_STATE_ALLOC)
{
dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL);
dcb_final_free(dcb);
return;
@ -1048,6 +1052,16 @@ dcb_close(DCB *dcb)
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE);
#if defined(ERRHANDLE)
/**
* close protocol and router session
*/
if (dcb->func.close != NULL)
{
dcb->func.close(dcb);
}
#endif
dcb_call_callback(dcb, DCB_REASON_CLOSE);
if (rc == 0) {
@ -1068,7 +1082,8 @@ dcb_close(DCB *dcb)
STRDCBSTATE(dcb->state))));
}
if (dcb->state == DCB_STATE_NOPOLLING) {
if (dcb->state == DCB_STATE_NOPOLLING)
{
dcb_add_to_zombieslist(dcb);
}
}

View File

@ -349,8 +349,8 @@ poll_waitevents(void *arg)
ss_dassert(dcb->state != DCB_STATE_FREED);
ss_debug(spinlock_release(&dcb->dcb_initlock);)
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] event %d dcb %p "
"role %s",
pthread_self(),

View File

@ -991,3 +991,9 @@ static void service_add_qualified_param(
(*p)->next = NULL;
spinlock_release(&svc->spin);
}
char* service_get_name(
SERVICE* svc)
{
return svc->name;
}

View File

@ -23,6 +23,8 @@
#include <skygw_utils.h>
#include <netinet/in.h>
// #define ERRHANDLE
struct session;
struct server;
struct service;
@ -222,6 +224,7 @@ typedef struct dcb {
unsigned int high_water; /**< High water mark */
unsigned int low_water; /**< Low water mark */
#if defined(SS_DEBUG)
int dcb_port; /**< port of target server */
skygw_chk_t dcb_chk_tail;
#endif
} DCB;

View File

@ -74,7 +74,13 @@ typedef struct router_object {
int (*routeQuery)(ROUTER *instance, void *router_session, GWBUF *queue);
void (*diagnostics)(ROUTER *instance, DCB *dcb);
void (*clientReply)(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb);
void (*errorReply)(ROUTER* instance, void* router_session, char* message, DCB *backend_dcb, int action);
void (*handleError)(
ROUTER* instance,
void* router_session,
char* message,
DCB *backend_dcb,
int action,
bool* succp);
uint8_t (*getCapabilities)(ROUTER *instance, void* router_session);
} ROUTER_OBJECT;

View File

@ -77,6 +77,8 @@ typedef struct server {
#define SERVER_MASTER 0x0002 /**<< The server is a master, i.e. can handle writes */
#define SERVER_SLAVE 0x0004 /**<< The server is a slave, i.e. can handle reads */
#define SERVER_JOINED 0x0008 /**<< The server is joined in a Galera cluster */
#define SERVER_MAINT 0x1000 /**<< Server is in maintenance mode */
/**
* Is the server running - the macro returns true if the server is marked as running
@ -107,6 +109,12 @@ typedef struct server {
#define SERVER_IS_JOINED(server) \
(((server)->status & (SERVER_RUNNING|SERVER_JOINED)) == (SERVER_RUNNING|SERVER_JOINED))
/**
* Is the server in maintenance mode.
*/
#define SERVER_IN_MAINT(server) ((server)->status & SERVER_MAINT)
extern SERVER *server_alloc(char *, char *, unsigned short);
extern int server_free(SERVER *);
extern SERVER *server_find_by_unique_name(char *);
@ -121,4 +129,6 @@ extern void server_set_status(SERVER *, int);
extern void server_clear_status(SERVER *, int);
extern void serverAddMonUser(SERVER *, char *, char *);
extern void server_update(SERVER *, char *, char *, char *);
void server_set_unique_name(SERVER *server, char *name);
#endif

View File

@ -162,4 +162,5 @@ bool service_set_slave_conn_limit (
extern void dprintService(DCB *, SERVICE *);
extern void dListServices(DCB *);
extern void dListListeners(DCB *);
char* service_get_name(SERVICE* svc);
#endif

View File

@ -88,7 +88,7 @@
#define SMALL_CHUNK 1024
#define MAX_CHUNK SMALL_CHUNK * 8 * 4
#define ToHex(Y) (Y>='0'&&Y<='9'?Y-'0':Y-'A'+10)
#define COM_QUIT_PACKET_SIZE (4+1)
struct dcb;
typedef enum {
@ -104,7 +104,6 @@ typedef enum {
MYSQL_SESSION_CHANGE
} mysql_pstate_t;
/*
* MySQL Protocol specific state data
*/
@ -257,6 +256,7 @@ int gw_send_authentication_to_backend(
MySQLProtocol *protocol);
const char *gw_mysql_protocol_state2string(int state);
int gw_do_connect_to_backend(char *host, int port, int* fd);
int mysql_send_com_quit(DCB* dcb, int packet_number);
int mysql_send_custom_error (
DCB *dcb,
int packet_number,

View File

@ -31,6 +31,12 @@
#include <dcb.h>
typedef enum bref_state {
BREF_NOT_USED,
BREF_IN_USE,
BREF_CLOSED
} bref_state_t;
typedef enum backend_type_t {
BE_UNDEFINED=-1,
BE_MASTER,
@ -159,6 +165,7 @@ typedef struct backend_ref_st {
#endif
BACKEND* bref_backend;
DCB* bref_dcb;
bref_state_t bref_state;
sescmd_cursor_t bref_sescmd_cur;
#if defined(SS_DEBUG)
skygw_chk_t bref_chk_tail;

View File

@ -294,18 +294,27 @@ int ismaster = 0, isslave = 0;
char *uname = defaultUser, *passwd = defaultPasswd;
unsigned long int server_version = 0;
char *server_string;
static int conn_err_count;
static int modval = 10;
if (database->server->monuser != NULL)
{
uname = database->server->monuser;
passwd = database->server->monpw;
}
if (uname == NULL)
return;
if (database->con == NULL || mysql_ping(database->con) != 0)
{
char *dpwd = decryptPassword(passwd);
int rc;
int read_timeout = 1;
database->con = mysql_init(NULL);
rc = mysql_options(database->con, MYSQL_OPT_READ_TIMEOUT, (void *)&read_timeout);
if (mysql_real_connect(database->con,
database->server->name,
uname,
@ -315,8 +324,25 @@ char *server_string;
NULL,
0) == NULL)
{
if (conn_err_count%modval == 0)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Monitor was unable to connect to "
"server %s:%d : \"%s\"",
database->server->name,
database->server->port,
mysql_error(database->con))));
conn_err_count = 0;
modval += 1;
}
else
{
conn_err_count += 1;
}
free(dpwd);
server_clear_status(database->server, SERVER_RUNNING);
return;
}
free(dpwd);
@ -428,7 +454,6 @@ char *server_string;
server_clear_status(database->server, SERVER_SLAVE);
server_clear_status(database->server, SERVER_MASTER);
}
}
/**
@ -441,6 +466,8 @@ monitorMain(void *arg)
{
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
MONITOR_SERVERS *ptr;
static int err_count;
static int modval = 10;
if (mysql_thread_init())
{
@ -463,7 +490,27 @@ MONITOR_SERVERS *ptr;
ptr = handle->databases;
while (ptr)
{
unsigned int prev_status = ptr->server->status;
monitorDatabase(ptr, handle->defaultUser, handle->defaultPasswd);
if (ptr->server->status != prev_status ||
(SERVER_IS_DOWN(ptr->server) &&
err_count%modval == 0))
{
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"Backend server %s:%d state : %s",
ptr->server->name,
ptr->server->port,
STRSRVSTATUS(ptr->server))));
err_count = 0;
modval += 1;
}
else if (SERVER_IS_DOWN(ptr->server))
{
err_count += 1;
}
ptr = ptr->next;
}
thread_millisleep(10000);

View File

@ -71,7 +71,7 @@ static GWPROTOCOL MyObject = {
gw_backend_close, /* Close */
NULL, /* Listen */
gw_change_user, /* Authentication */
gw_session /* Session */
NULL /* Session */
};
/*
@ -403,7 +403,11 @@ static int gw_read_backend_event(DCB *dcb) {
* failed, connection must be closed to avoid backend
* dcb from getting hanged.
*/
#if defined(ERRHANDLE)
dcb_close(dcb);
#else
(dcb->func).close(dcb);
#endif
rc = 0;
goto return_rc;
}
@ -435,6 +439,7 @@ static int gw_read_backend_event(DCB *dcb) {
if (client_protocol->state == MYSQL_IDLE)
{
gwbuf_set_type(writebuf, GWBUF_TYPE_MYSQL);
router->clientReply(router_instance,
rsession,
writebuf,
@ -443,6 +448,7 @@ static int gw_read_backend_event(DCB *dcb) {
}
goto return_rc;
} else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL) {
gwbuf_set_type(writebuf, GWBUF_TYPE_MYSQL);
router->clientReply(router_instance, rsession, writebuf, dcb);
rc = 1;
}
@ -542,6 +548,8 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
MySQLProtocol *backend_protocol = dcb->protocol;
int rc = 0;
ss_dassert(dcb->state == DCB_STATE_POLLING);
#if !defined(ERRHANDLE)
/*<
* Don't write to backend if backend_dcb is not in poll set anymore.
*/
@ -565,6 +573,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
goto return_rc;
}
spinlock_release(&dcb->dcb_initlock);
#endif
spinlock_acquire(&dcb->authlock);
/**
* Pick action according to state of protocol.
@ -608,6 +617,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
dcb->fd,
STRPROTOCOLSTATE(backend_protocol->state))));
spinlock_release(&dcb->authlock);
rc = dcb_write(dcb, queue);
goto return_rc;
break;
@ -816,16 +826,18 @@ gw_backend_hangup(DCB *dcb)
}
/**
* Close the backend dcb
*
* Send COM_QUIT to backend so that it can be closed.
* @param dcb The current Backend DCB
* @return 1 always
*/
static int
gw_backend_close(DCB *dcb)
{
/*< vraa : errorHandle */
#if defined(ERRHANDLE)
mysql_send_com_quit(dcb, 1);
#else
dcb_close(dcb);
#endif
return 1;
}
@ -903,7 +915,12 @@ static int backend_write_delayqueue(DCB *dcb)
static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWBUF *queue) {
static int gw_change_user(
DCB *backend,
SERVER *server,
SESSION *in_session,
GWBUF *queue)
{
MYSQL_session *current_session = NULL;
MySQLProtocol *backend_protocol = NULL;
MySQLProtocol *client_protocol = NULL;
@ -989,6 +1006,7 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB
* @param
* @return always 1
*/
/*
static int gw_session(DCB *backend_dcb, void *data) {
GWBUF *queue = NULL;
@ -998,3 +1016,4 @@ static int gw_session(DCB *backend_dcb, void *data) {
return 1;
}
*/

View File

@ -598,7 +598,8 @@ int gw_read_client_event(DCB* dcb) {
else
{
/**
* There is at least one complete mysql packet read
* There is at least one complete mysql packet in
* read_buffer.
*/
read_buffer = dcb->dcb_readqueue;
dcb->dcb_readqueue = NULL;
@ -623,58 +624,70 @@ int gw_read_client_event(DCB* dcb) {
switch (protocol->state) {
case MYSQL_AUTH_SENT:
/*
* Read all the data that is available into a chain of buffers
*/
{
int auth_val = -1;
auth_val = gw_mysql_do_authentication(dcb, read_buffer);
// Data handled withot the dcb->func.write
// so consume it now
// be sure to consume it all
read_buffer = gwbuf_consume(read_buffer, nbytes_read);
ss_dassert(read_buffer == NULL || GWBUF_EMPTY(read_buffer));
if (auth_val == 0)
{
SESSION *session = NULL;
protocol->state = MYSQL_AUTH_RECV;
//write to client mysql AUTH_OK packet, packet n. is 2
// start a new session, and connect to backends
/**
* Create session, and a router session for it.
* If successful, there will be backend connection(s)
* after this point.
*/
session = session_alloc(dcb->service, dcb);
if (session != NULL) {
if (session != NULL)
{
CHK_SESSION(session);
ss_dassert(session->state != SESSION_STATE_ALLOC);
protocol->state = MYSQL_IDLE;
/**
* Send an AUTH_OK packet to the client,
* packet sequence is # 2
*/
mysql_send_ok(dcb, 2, 0, NULL);
} else {
}
else
{
protocol->state = MYSQL_AUTH_FAILED;
/** Send ERR 1045 to client */
mysql_send_auth_error(
dcb,
2,
0,
"failed to create new session");
#if defined(ERRHANDLE)
dcb_close(dcb);
#else
dcb->func.close(dcb);
#endif
}
}
else
{
protocol->state = MYSQL_AUTH_FAILED;
/** Send ERR 1045 to client */
mysql_send_auth_error(
dcb,
2,
0,
"Authorization failed");
#if defined(ERRHANDLE)
dcb_close(dcb);
#else
dcb->func.close(dcb);
#endif
}
}
break;
case MYSQL_IDLE:
/*
* Read all the data that is available into a chain of buffers
*/
{
uint8_t cap = 0;
uint8_t *ptr_buff = NULL;
@ -682,14 +695,16 @@ int gw_read_client_event(DCB* dcb) {
bool stmt_input; /*< router input type */
session = dcb->session;
ss_dassert( session!= NULL);
// get the backend session, if available
if (session != NULL) {
if (session != NULL)
{
CHK_SESSION(session);
router = session->service->router;
router_instance =
session->service->router_instance;
rsession = session->router_session;
ss_dassert(rsession != NULL);
}
/* Now, we are assuming in the first buffer there is
@ -706,9 +721,11 @@ int gw_read_client_event(DCB* dcb) {
* COM_QUIT : close client dcb
* else : write custom error to client dcb.
*/
if(rsession == NULL) {
if(rsession == NULL)
{
/** COM_QUIT */
if (mysql_command == '\x01') {
if (mysql_command == '\x01')
{
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [gw_read_client_event] Client read "
@ -716,8 +733,18 @@ int gw_read_client_event(DCB* dcb) {
"client dcb %p.",
pthread_self(),
dcb)));
#if defined(ERRHANDLE)
/**
* close router session and that closes
* backends
*/
dcb_close(dcb);
#else
(dcb->func).close(dcb);
} else {
#endif
}
else
{
/* Send a custom error as MySQL command reply */
mysql_send_custom_error(
dcb,
@ -732,6 +759,7 @@ int gw_read_client_event(DCB* dcb) {
read_buffer = gwbuf_consume(read_buffer, nbytes_read);
goto return_rc;
}
/** Ask what type of input the router expects */
cap = router->getCapabilities(router_instance, rsession);
@ -752,7 +780,6 @@ int gw_read_client_event(DCB* dcb) {
"%lu [gw_read_client_event] Reading router "
"capabilities failed.",
pthread_self())));
mysql_send_custom_error(dcb,
1,
0,
@ -762,9 +789,16 @@ int gw_read_client_event(DCB* dcb) {
goto return_rc;
}
/** Route COM_QUIT to backend */
if (mysql_command == '\x01') {
#if defined(ERRHANDLE)
/**
* Close router session and that closes
* backends.
* Closing backends includes sending COM_QUIT packets.
*/
dcb_close(dcb);
#else
router->routeQuery(router_instance, rsession, read_buffer);
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
@ -774,6 +808,7 @@ int gw_read_client_event(DCB* dcb) {
dcb)));
/** close client connection, closes router session too */
rc = dcb->func.close(dcb);
#endif
}
else
{
@ -805,6 +840,49 @@ int gw_read_client_event(DCB* dcb) {
if (rc == 1) {
rc = 0; /**< here '0' means success */
} else {
#if defined(ERRHANDLE2)
bool succp;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing the query failed. "
"Reselecting backends.")));
/**
* Decide whether close router and its
* connections or just send an error to client
*/
router->handleError(router_instance,
rsession,
"Query routing failed. "
"Query execution aborted. "
"Reselecting backend.",
NULL,
ERRACT_RELECT_BACKENDS,
&succp);
if (!succp)
{
router->handleError(router_instance,
rsession,
"Connection to "
"backend lost.",
NULL,
ERRACT_CLOSE_RSES,
NULL);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Reselecting backend "
"servers failed.")));
}
else
{
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"Reselected backend servers.")));
}
#else
mysql_send_custom_error(dcb,
1,
0,
@ -812,6 +890,7 @@ int gw_read_client_event(DCB* dcb) {
"Connection to backend "
"lost.");
protocol->state = MYSQL_IDLE;
#endif
}
}
goto return_rc;
@ -1171,23 +1250,31 @@ int gw_MySQLAccept(DCB *listener)
client_dcb->fd = c_sock;
// get client address
if ( client_conn.sa_family == AF_UNIX) {
if ( client_conn.sa_family == AF_UNIX)
{
// client address
client_dcb->remote = strdup("localhost_from_socket");
// set localhost IP for user authentication
(client_dcb->ipv4).sin_addr.s_addr = 0x0100007F;
} else {
}
else
{
/* client IPv4 in raw data*/
memcpy(&client_dcb->ipv4, (struct sockaddr_in *)&client_conn, sizeof(struct sockaddr_in));
memcpy(&client_dcb->ipv4,
(struct sockaddr_in *)&client_conn,
sizeof(struct sockaddr_in));
/* client IPv4 in string representation */
client_dcb->remote = (char *)calloc(INET_ADDRSTRLEN+1, sizeof(char));
if (client_dcb->remote != NULL) {
inet_ntop(AF_INET, &(client_dcb->ipv4).sin_addr, client_dcb->remote, INET_ADDRSTRLEN);
}
}
if (client_dcb->remote != NULL)
{
inet_ntop(AF_INET,
&(client_dcb->ipv4).sin_addr,
client_dcb->remote,
INET_ADDRSTRLEN);
}
}
protocol = mysql_protocol_init(client_dcb, c_sock);
ss_dassert(protocol != NULL);
if (protocol == NULL) {
@ -1224,7 +1311,7 @@ int gw_MySQLAccept(DCB *listener)
0,
"MaxScale internal error.");
/** delete client_dcb */
/** close client_dcb */
dcb_close(client_dcb);
/** Previous state is recovered in poll_add_dcb. */
@ -1265,10 +1352,13 @@ static int gw_error_client_event(
int rc;
CHK_DCB(dcb);
#if defined(ERRHANDLE)
dcb_close(dcb);
return 1;
#else
rc = dcb->func.close(dcb);
return rc;
#endif
}
static int
@ -1305,8 +1395,10 @@ gw_client_close(DCB *dcb)
router->closeSession(router_instance, rsession);
}
#if !defined(ERRHANDLE)
/** close client DCB */
dcb_close(dcb);
#endif
return 1;
}
@ -1324,9 +1416,14 @@ gw_client_hangup_event(DCB *dcb)
int rc;
CHK_DCB(dcb);
#if defined(ERRHANDLE)
dcb_close(dcb);
return 1;
#else
rc = dcb->func.close(dcb);
return rc;
#endif
}

View File

@ -321,26 +321,27 @@ int gw_receive_backend_auth(
}
else if (ptr[4] == 0xff)
{
size_t packetlen = MYSQL_GET_PACKET_LEN(ptr)+4;
char* bufstr = (char *)calloc(1, packetlen-3);
snprintf(bufstr, packetlen-6, "%s", &ptr[7]);
size_t len = MYSQL_GET_PACKET_LEN(ptr);
char* err = strndup(&ptr[8], 5);
char* bufstr = strndup(&ptr[13], len-4-5);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_receive_backend_auth] Invalid "
"authentication message from backend dcb %p "
"fd %d, ptr[4] = %p, msg %s.",
"fd %d, ptr[4] = %p, error %s, msg %s.",
pthread_self(),
dcb,
dcb->fd,
ptr[4],
err,
bufstr)));
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Invalid authentication message "
"from backend. Msg : %s",
"from backend. Error : %s, Msg : %s",
err,
bufstr)));
free(bufstr);
@ -367,7 +368,7 @@ int gw_receive_backend_auth(
/*<
* Remove data from buffer.
*/
head = gwbuf_consume(head, GWBUF_LENGTH(head));
while ((head = gwbuf_consume(head, GWBUF_LENGTH(head))) != NULL);
}
else if (n == 0)
{
@ -634,8 +635,8 @@ int gw_do_connect_to_backend(
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error: Establishing connection to backend server "
"%s:%d failed.\n\t\t Socket creation failed due "
"%d, %s.",
"%s:%d failed.\n\t\t Socket creation failed "
"due %d, %s.",
host,
port,
eno,
@ -736,6 +737,45 @@ gw_mysql_protocol_state2string (int state) {
}
}
int mysql_send_com_quit(
DCB* dcb,
int packet_number)
{
uint8_t *data;
GWBUF *buf;
int nbytes = 0;
CHK_DCB(dcb);
ss_dassert(packet_number <= 255);
if (dcb == NULL ||
(dcb->state != DCB_STATE_NOPOLLING &&
dcb->state != DCB_STATE_ZOMBIE))
{
return 0;
}
buf = gwbuf_alloc(COM_QUIT_PACKET_SIZE);
ss_dassert(buf != NULL);
if (buf == NULL)
{
return 0;
}
data = GWBUF_DATA(buf);
*data++ = 0x1;
*data++ = 0x0;
*data++ = 0x0;
*data++ = packet_number;
*data = 0x1;
nbytes = dcb->func.write(dcb, buf);
return nbytes;
}
/**
* mysql_send_custom_error
*
@ -1229,7 +1269,12 @@ int gw_find_mysql_user_password_sha1(char *username, uint8_t *gateway_password,
*
*/
int
mysql_send_auth_error (DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message) {
mysql_send_auth_error (
DCB *dcb,
int packet_number,
int in_affected_rows,
const char *mysql_message)
{
uint8_t *outbuf = NULL;
uint8_t mysql_payload_size = 0;
uint8_t mysql_packet_header[4];

View File

@ -58,12 +58,14 @@ static void GHACloseSession(ROUTER *instance, void *router_session);
static void GHAFreeSession(ROUTER *instance, void *router_session);
static int GHARouteQuery(ROUTER *instance, void *router_session, GWBUF *queue);
static void GHADiagnostics(ROUTER *instance, DCB *dcb);
static void GHAClientReply(
ROUTER *instance,
void *router_session,
GWBUF *queue,
DCB *backend_dcb);
static void GHAErrorReply(
static void GHAHandleError(
ROUTER *instance,
void *router_session,
char *message,
@ -79,7 +81,7 @@ static ROUTER_OBJECT MyObject = {
GHARouteQuery,
GHADiagnostics,
GHAClientReply,
GHAErrorReply
GHAHandleError
};
static bool rses_begin_router_action(
@ -630,10 +632,9 @@ GHAClientReply(
}
/**
* Error Reply routine
* Error handling routine
*
* The routine will reply to client errors and/or closing the session
* or try to open a new backend connection.
* The routine will handle error occurred in backend.
*
* @param instance The router instance
* @param router_session The router session
@ -643,7 +644,7 @@ GHAClientReply(
*
*/
static void
GHAErrorReply(
GHAHandleError(
ROUTER *instance,
void *router_session,
char *message,

View File

@ -102,12 +102,13 @@ static void clientReply(
void *router_session,
GWBUF *queue,
DCB *backend_dcb);
static void errorReply(
static void handleError(
ROUTER *instance,
void *router_session,
char *message,
DCB *backend_dcb,
int action);
int action,
bool *succp);
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
@ -120,7 +121,7 @@ static ROUTER_OBJECT MyObject = {
routeQuery,
diagnostics,
clientReply,
errorReply,
handleError,
getCapabilities
};
@ -681,10 +682,9 @@ clientReply(
}
/**
* Error Reply routine
* Error Handler routine
*
* The routine will reply to client errors and/or closing the session
* or try to open a new backend connection.
* The routine will handle errors that occurred in backend writes.
*
* @param instance The router instance
* @param router_session The router session
@ -694,12 +694,13 @@ clientReply(
*
*/
static void
errorReply(
handleError(
ROUTER *instance,
void *router_session,
char *message,
DCB *backend_dcb,
int action)
int action,
bool *succp)
{
DCB *client = NULL;
SESSION *session = backend_dcb->session;

View File

@ -30,6 +30,9 @@
#include <query_classifier.h>
#include <dcb.h>
#include <spinlock.h>
#if defined(SS_DEBUG)
# include <mysql_client_server_protocol.h>
#endif
extern int lm_enabled_logfiles_bitmask;
@ -63,11 +66,23 @@ static void closeSession(ROUTER *instance, void *session);
static void freeSession(ROUTER *instance, void *session);
static int routeQuery(ROUTER *instance, void *session, GWBUF *queue);
static void diagnostic(ROUTER *instance, DCB *dcb);
static void clientReply(
ROUTER* instance,
void* router_session,
GWBUF* queue,
DCB* backend_dcb);
static void handleError(
ROUTER* instance,
void* router_session,
char* message,
DCB* backend_dcb,
int action,
bool* succp);
static void print_error_packet(ROUTER_CLIENT_SES* rses, GWBUF* buf, DCB* dcb);
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
int bref_cmp_global_conn(
@ -118,7 +133,7 @@ static ROUTER_OBJECT MyObject = {
routeQuery,
diagnostic,
clientReply,
NULL,
handleError,
getCapabilities
};
static bool rses_begin_locked_router_action(
@ -658,11 +673,19 @@ static void closeSession(
DCB* dcb = backend_ref[i].bref_dcb;
/** Close those which had been connected */
if (dcb != NULL)
if (backend_ref[i].bref_state == BREF_IN_USE)
{
CHK_DCB(dcb);
backend_ref[i].bref_dcb = NULL; /*< prevent new uses of DCB */
backend_ref[i].bref_state = BREF_NOT_USED;
#if defined(ERRHANDLE)
/**
* closes protocol and dcb
*/
dcb_close(dcb);
#else
dcb->func.close(dcb);
#endif
/** decrease server current connection counters */
atomic_add(&backend_ref[i].bref_backend->backend_server->stats.n_current, -1);
atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1);
@ -688,7 +711,7 @@ static void freeSession(
for (i=0; i<router_cli_ses->rses_nbackends; i++)
{
if (backend_ref[i].bref_dcb == NULL)
if (backend_ref[i].bref_state != BREF_IN_USE)
{
continue;
}
@ -764,7 +787,7 @@ static bool get_dcb(
{
BACKEND* b = backend_ref[i].bref_backend;
if (backend_ref[i].bref_dcb != NULL &&
if (backend_ref[i].bref_state == BREF_IN_USE &&
SERVER_IS_SLAVE(b->backend_server) &&
(smallest_nconn == -1 ||
b->backend_conn_count < smallest_nconn))
@ -779,7 +802,7 @@ static bool get_dcb(
{
backend_ref = rses->rses_master_ref;
if (backend_ref->bref_dcb != NULL)
if (backend_ref[i].bref_state == BREF_IN_USE)
{
*p_dcb = backend_ref->bref_dcb;
succp = true;
@ -799,13 +822,13 @@ static bool get_dcb(
}
ss_dassert(succp);
}
else if (btype == BE_MASTER || BE_JOINED)
else if (btype == BE_MASTER)
{
for (i=0; i<rses->rses_nbackends; i++)
{
BACKEND* b = backend_ref[i].bref_backend;
if (backend_ref[i].bref_dcb != NULL &&
if (backend_ref[i].bref_state == BREF_IN_USE &&
(SERVER_IS_MASTER(b->backend_server) ||
SERVER_IS_JOINED(b->backend_server)))
{
@ -930,23 +953,7 @@ static int routeQuery(
default:
break;
} /**< switch by packet type */
#if 0
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"String\t\"%s\"",
querystr == NULL ? "(empty)" : querystr)));
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"Packet type\t%s",
STRPACKETTYPE(packet_type))));
#endif
#if defined(AUTOCOMMIT_OPT)
if ((QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT) &&
!router_cli_ses->rses_autocommit_enabled) ||
(QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) &&
router_cli_ses->rses_autocommit_enabled))
{
/** reply directly to client */
}
#endif
/**
* If autocommit is disabled or transaction is explicitly started
* transaction becomes active and master gets all statements until
@ -1128,6 +1135,7 @@ static bool rses_begin_locked_router_action(
CHK_CLIENT_RSES(rses);
if (rses->rses_closed) {
goto return_succp;
}
spinlock_acquire(&rses->rses_lock);
@ -1138,10 +1146,6 @@ static bool rses_begin_locked_router_action(
succp = true;
return_succp:
if (!succp)
{
/** log that router session was closed */
}
return succp;
}
@ -1242,9 +1246,7 @@ static void clientReply(
*/
if (!rses_begin_locked_router_action(router_cli_ses))
{
while ((writebuf = gwbuf_consume(
writebuf,
GWBUF_LENGTH(writebuf))) != NULL);
print_error_packet(router_cli_ses, writebuf, backend_dcb);
goto lock_failed;
}
/** Holding lock ensures that router session remains open */
@ -1311,14 +1313,9 @@ static void clientReply(
{
/** Write reply to client DCB */
client_dcb->func.write(client_dcb, writebuf);
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [clientReply:rwsplit] client dcb %p, "
"backend dcb %p. End of normal reply.",
pthread_self(),
client_dcb,
backend_dcb)));
/**
* Log reply but use identifier for query
*/
}
lock_failed:
@ -1385,6 +1382,8 @@ int bref_cmp_behind_master(
*
* @details It is assumed that there is only one master among servers of
* a router instance. As a result, the first master found is chosen.
* There will possibly be more backend references than connected backends
* because only those in correct state are connected to.
*/
static bool select_connect_backend_servers(
backend_ref_t** p_master_ref,
@ -1429,7 +1428,7 @@ static bool select_connect_backend_servers(
is_synced_master = false;
}
#if 0
#if defined(EXTRA_DEBUGGING)
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns before ordering:")));
for (i=0; i<router_nservers; i++)
{
@ -1498,14 +1497,13 @@ static bool select_connect_backend_servers(
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [select_backend_servers] Examine server "
"%s:%d with %d connections. Status is %d, "
"Examine server "
"%s:%d %s with %d connections. "
"router->bitvalue is %d",
pthread_self(),
b->backend_server->name,
b->backend_server->port,
STRSRVSTATUS(b->backend_server),
b->backend_conn_count,
b->backend_server->status,
router->bitmask)));
if (SERVER_IS_RUNNING(b->backend_server) &&
@ -1524,6 +1522,7 @@ static bool select_connect_backend_servers(
if (backend_ref[i].bref_dcb != NULL)
{
slaves_connected += 1;
backend_ref[i].bref_state = BREF_IN_USE;
/**
* Increase backend connection counter.
* Server's stats are _increased_ in
@ -1558,6 +1557,7 @@ static bool select_connect_backend_servers(
if (backend_ref[i].bref_dcb != NULL)
{
master_connected = true;
backend_ref[i].bref_state = BREF_IN_USE;
*p_master_ref = &backend_ref[i];
/** Increase backend connection counter */
/** Increase backend connection counter */
@ -1577,6 +1577,18 @@ static bool select_connect_backend_servers(
/* handle connect error */
}
}
else
{
succp = false;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to establish "
"connection with server %s:%d, %s",
b->backend_server->name,
b->backend_server->port,
STRSRVSTATUS(b->backend_server))));
/* handle connect error */
}
}
} /*< for */
@ -1638,7 +1650,7 @@ static bool select_connect_backend_servers(
{
BACKEND* b = backend_ref[i].bref_backend;
if (backend_ref[i].bref_dcb != NULL)
if (backend_ref[i].bref_state == BREF_IN_USE)
{
backend_type_t btype = BACKEND_TYPE(b);
@ -1726,7 +1738,7 @@ static bool select_connect_backend_servers(
/** Clean up connections */
for (i=0; i<router_nservers; i++)
{
if (backend_ref[i].bref_dcb != NULL)
if (backend_ref[i].bref_state == BREF_IN_USE)
{
ss_dassert(backend_ref[i].bref_backend->backend_conn_count > 0);
/** disconnect opened connections */
@ -2075,7 +2087,7 @@ static bool execute_sescmd_in_backend(
int rc = 0;
sescmd_cursor_t* scur;
if (backend_ref->bref_dcb == NULL)
if (backend_ref->bref_state == BREF_CLOSED)
{
goto return_succp;
}
@ -2350,7 +2362,7 @@ static bool route_session_write(
{
DCB* dcb = backend_ref[i].bref_dcb;
if (dcb != NULL)
if (backend_ref[i].bref_state == BREF_IN_USE)
{
rc = dcb->func.write(dcb, gwbuf_clone(querybuf));
@ -2383,14 +2395,20 @@ static bool route_session_write(
rses_property_add(router_cli_ses, prop);
for (i=0; i<router_cli_ses->rses_nbackends; i++)
{
if (backend_ref[i].bref_state == BREF_IN_USE)
{
succp = execute_sescmd_in_backend(&backend_ref[i]);
if (!succp)
{
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
goto return_succp;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Failed to execute session "
"command in %s:%d",
backend_ref[i].bref_backend->backend_server->name,
backend_ref[i].bref_backend->backend_server->port)));
}
}
}
/** Unlock router session */
@ -2451,3 +2469,83 @@ static void rwsplit_process_options(
}
} /*< for */
}
/**
* Error Handler routine
*
* The routine will handle errors that occurred in backend writes.
*
* @param instance The router instance
* @param router_session The router session
* @param message The error message to reply
* @param backend_dcb The backend DCB
* @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION
*
*/
static void handleError (
ROUTER *instance,
void *router_session,
char *message,
DCB *backend_dcb,
int action,
bool *succp)
{
DCB* client_dcb = NULL;
SESSION* session = backend_dcb->session;
client_dcb = session->client;
ss_dassert(client_dcb != NULL);
}
static void print_error_packet(
ROUTER_CLIENT_SES* rses,
GWBUF* buf,
DCB* dcb)
{
if (buf->gwbuf_type == GWBUF_TYPE_MYSQL)
{
while (gwbuf_length(buf) > 0)
{
/**
* This works with MySQL protocol only !
* Protocol specific packet print functions would be nice.
*/
uint8_t* ptr = GWBUF_DATA(buf);
size_t len = MYSQL_GET_PACKET_LEN(ptr);
if (MYSQL_GET_COMMAND(ptr) == 0xff)
{
SERVER* srv = NULL;
backend_ref_t* bref = rses->rses_backend_ref;
int i;
char* bufstr;
for (i=0; i<rses->rses_nbackends; i++)
{
if (bref[i].bref_dcb == dcb)
{
srv = bref[i].bref_backend->backend_server;
}
}
ss_dassert(srv != NULL);
bufstr = strndup(&ptr[7], len-3);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Backend server %s:%d responded with "
"error : %s",
srv->name,
srv->port,
bufstr)));
free(bufstr);
}
buf = gwbuf_consume(buf, len+4);
}
}
else
{
while ((buf = gwbuf_consume(buf, GWBUF_LENGTH(buf))) != NULL);
}
}

View File

@ -229,6 +229,12 @@ typedef enum skygw_chk_t {
((c) == LEAST_ROUTER_CONNECTIONS ? "LEAST_ROUTER_CONNECTIONS" : \
((c) == LEAST_BEHIND_MASTER ? "LEAST_BEHIND_MASTER" : "Unknown criteria"))))
#define STRSRVSTATUS(s) ((SERVER_IS_RUNNING(s) && SERVER_IS_MASTER(s)) ? "RUNNING MASTER" : \
((SERVER_IS_RUNNING(s) && SERVER_IS_SLAVE(s)) ? "RUNNING SLAVE" : \
((SERVER_IS_RUNNING(s) && SERVER_IS_JOINED(s)) ? "RUNNING JOINED" : \
((SERVER_IS_RUNNING(s) && SERVER_IN_MAINT(s)) ? "RUNNING MAINTENANCE" : \
(SERVER_IS_RUNNING(s) ? "RUNNING (only)" : "NO STATUS")))))
#define CHK_MLIST(l) { \
ss_info_dassert((l->mlist_chk_top == CHK_NUM_MLIST && \
l->mlist_chk_tail == CHK_NUM_MLIST), \