Added status checks, removed some dead code, changed macros to enumerated types, renamed variables to reflect better what they mean.
This commit is contained in:
parent
8c8475740a
commit
786468fc5d
@ -129,94 +129,7 @@ static void signal_set (int sig, void (*handler)(int)) {
|
||||
}
|
||||
}
|
||||
|
||||
int handle_event_errors(DCB *dcb) {
|
||||
|
||||
fprintf(stderr, "#### Handle error function for [%i] is [%s]\n", dcb->state, gw_dcb_state2string(dcb->state));
|
||||
|
||||
if (dcb->state == DCB_STATE_DISCONNECTED) {
|
||||
fprintf(stderr, "#### Handle error function, session is %p\n", dcb->session);
|
||||
return 1;
|
||||
}
|
||||
|
||||
#ifdef GW_EVENT_DEBUG
|
||||
if (event != -1) {
|
||||
fprintf(stderr, ">>>>>> DCB state %i, Protocol State %i: event %i, %i\n", dcb->state, protocol->state, event & EPOLLERR, event & EPOLLHUP);
|
||||
if(event & EPOLLHUP)
|
||||
fprintf(stderr, "EPOLLHUP\n");
|
||||
|
||||
if(event & EPOLLERR)
|
||||
fprintf(stderr, "EPOLLERR\n");
|
||||
|
||||
if(event & EPOLLPRI)
|
||||
fprintf(stderr, "EPOLLPRI\n");
|
||||
}
|
||||
#endif
|
||||
|
||||
if (dcb->state != DCB_STATE_LISTENING) {
|
||||
if (poll_remove_dcb(dcb) == -1) {
|
||||
fprintf(stderr, "poll_remove_dcb: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno));
|
||||
}
|
||||
|
||||
#ifdef GW_EVENT_DEBUG
|
||||
fprintf(stderr, "closing fd [%i]=[%i], from events\n", dcb->fd, protocol->fd);
|
||||
#endif
|
||||
if (dcb->fd) {
|
||||
//fprintf(stderr, "Client protocol dcb->protocol %p\n", dcb->protocol);
|
||||
|
||||
gw_mysql_close((MySQLProtocol **)&dcb->protocol);
|
||||
fprintf(stderr, "Client protocol dcb->protocol %p\n", dcb->protocol);
|
||||
|
||||
dcb->state = DCB_STATE_DISCONNECTED;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
fprintf(stderr, "Return from error handling, dcb is %p\n", dcb);
|
||||
//free(dcb->session);
|
||||
dcb->state = DCB_STATE_FREED;
|
||||
|
||||
fprintf(stderr, "#### Handle error function RETURN for [%i] is [%s]\n", dcb->state, gw_dcb_state2string(dcb->state));
|
||||
//free(dcb);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int handle_event_errors_backend(DCB *dcb) {
|
||||
|
||||
fprintf(stderr, "#### Handle Backend error function for %i\n", dcb->fd);
|
||||
|
||||
#ifdef GW_EVENT_DEBUG
|
||||
if (event != -1) {
|
||||
fprintf(stderr, ">>>>>> Backend DCB state %i, Protocol State %i: event %i, %i\n", dcb->state, dcb->proto_state, event & EPOLLERR, event & EPOLLHUP);
|
||||
if(event & EPOLLHUP)
|
||||
fprintf(stderr, "EPOLLHUP\n");
|
||||
|
||||
if(event & EPOLLERR)
|
||||
fprintf(stderr, "EPOLLERR\n");
|
||||
|
||||
if(event & EPOLLPRI)
|
||||
fprintf(stderr, "EPOLLPRI\n");
|
||||
}
|
||||
#endif
|
||||
|
||||
if (dcb->state != DCB_STATE_LISTENING) {
|
||||
if (poll_remove_dcb(dcb) == -1) {
|
||||
fprintf(stderr, "Backend poll_remove_dcb: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno));
|
||||
}
|
||||
|
||||
#ifdef GW_EVENT_DEBUG
|
||||
fprintf(stderr, "Backend closing fd [%i]=%i, from events check\n", dcb->fd, protocol->fd);
|
||||
#endif
|
||||
if (dcb->fd) {
|
||||
dcb->state = DCB_STATE_DISCONNECTED;
|
||||
fprintf(stderr, "Freeing backend MySQL conn %p, %p\n", dcb->protocol, &dcb->protocol);
|
||||
gw_mysql_close((MySQLProtocol **)&dcb->protocol);
|
||||
fprintf(stderr, "Freeing backend MySQL conn %p, %p\n", dcb->protocol, &dcb->protocol);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup the temporary data directory we created for the gateway
|
||||
|
@ -41,6 +41,10 @@
|
||||
#include <buffer.h>
|
||||
#include <openssl/sha.h>
|
||||
|
||||
#include <skygw_types.h>
|
||||
#include <skygw_utils.h>
|
||||
#include <log_manager.h>
|
||||
|
||||
|
||||
///////////////////////////////////////
|
||||
// MYSQL_conn structure setup
|
||||
@ -49,19 +53,28 @@ MySQLProtocol *gw_mysql_init(MySQLProtocol *data) {
|
||||
|
||||
MySQLProtocol *input = NULL;
|
||||
|
||||
// structure allocation
|
||||
input = calloc(1, sizeof(MySQLProtocol));
|
||||
|
||||
if (input == NULL) {
|
||||
// structure allocation
|
||||
input = calloc(1, sizeof(MySQLProtocol));
|
||||
|
||||
if (input == NULL)
|
||||
return NULL;
|
||||
|
||||
int eno = errno;
|
||||
errno = 0;
|
||||
skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"%lu [gw_mysql_init] failed to allocate memory for MySQL "
|
||||
"protocol object. Errno %d, %s.",
|
||||
pthread_self(),
|
||||
eno,
|
||||
strerror(eno));
|
||||
goto return_input;
|
||||
}
|
||||
|
||||
input->protocol_chk_top = CHK_NUM_PROTOCOL;
|
||||
input->protocol_chk_tail = CHK_NUM_PROTOCOL;
|
||||
simple_mutex_init(&input->protocol_mutex, "MySQL Protocol mutex");
|
||||
#ifdef MYSQL_CONN_DEBUG
|
||||
fprintf(stderr, "gw_mysql_init() called\n");
|
||||
#endif
|
||||
|
||||
return_input:
|
||||
return input;
|
||||
}
|
||||
|
||||
|
@ -82,17 +82,52 @@
|
||||
|
||||
struct dcb;
|
||||
|
||||
#if 0
|
||||
/* MySQL Protocol States */
|
||||
#define MYSQL_ALLOC 0 /* Allocate data */
|
||||
#define MYSQL_PENDING_CONNECT 1 /* Backend socket pending connect */
|
||||
#define MYSQL_CONNECTED 2 /* Backend socket Connected */
|
||||
#define MYSQL_AUTH_SENT 3 /* Authentication handshake has been sent */
|
||||
#define MYSQL_AUTH_RECV 4 /* Received user, password, db and capabilities */
|
||||
#define MYSQL_AUTH_FAILED 5 /* Auth failed, return error packet */
|
||||
#define MYSQL_IDLE 6 /* Auth done. Protocol is idle, waiting for statements */
|
||||
#define MYSQL_ROUTING 7 /* The received command has been routed to backend(s) */
|
||||
#define MYSQL_WAITING_RESULT 8 /* Waiting for result set */
|
||||
#define MYSQL_SESSION_CHANGE 9 /* Pending session change */
|
||||
#else
|
||||
typedef enum {
|
||||
MYSQL_ALLOC,
|
||||
MYSQL_PENDING_CONNECT,
|
||||
MYSQL_CONNECTED,
|
||||
MYSQL_AUTH_SENT,
|
||||
MYSQL_AUTH_RECV,
|
||||
MYSQL_AUTH_FAILED,
|
||||
MYSQL_IDLE,
|
||||
MYSQL_ROUTING,
|
||||
MYSQL_WAITING_RESULT,
|
||||
MYSQL_SESSION_CHANGE
|
||||
} mysql_pstate_t;
|
||||
#endif
|
||||
|
||||
/*
|
||||
* MySQL Protocol specific state data
|
||||
*/
|
||||
typedef struct {
|
||||
skygw_chk_t protocol_chk_top;
|
||||
simple_mutex_t protocol_mutex;
|
||||
int fd; /* The socket descriptor */
|
||||
struct dcb *descriptor; /* The DCB of the socket we are running on */
|
||||
int state; /* Current descriptor state */
|
||||
uint8_t scramble[MYSQL_SCRAMBLE_LEN]; /* server scramble, created or received */
|
||||
uint32_t server_capabilities; /* server capabilities, created or received */
|
||||
uint32_t client_capabilities; /* client capabilities, created or received */
|
||||
unsigned long tid; /* MySQL Thread ID, in handshake */
|
||||
struct dcb *descriptor; /** The DCB of the socket
|
||||
* we are running on */
|
||||
mysql_pstate_t state; /** Current descriptor state */
|
||||
uint8_t scramble[MYSQL_SCRAMBLE_LEN]; /** server scramble,
|
||||
* created or received */
|
||||
uint32_t server_capabilities; /** server capabilities,
|
||||
* created or received */
|
||||
uint32_t client_capabilities; /** client capabilities,
|
||||
* created or received */
|
||||
unsigned long tid; /** MySQL Thread ID, in
|
||||
* handshake */
|
||||
skygw_chk_t protocol_chk_tail;
|
||||
} MySQLProtocol;
|
||||
|
||||
/*
|
||||
@ -105,17 +140,6 @@ typedef struct mysql_session {
|
||||
char db[MYSQL_DATABASE_MAXLEN]; /* database */
|
||||
} MYSQL_session;
|
||||
|
||||
/* MySQL Protocol States */
|
||||
#define MYSQL_ALLOC 0 /* Allocate data */
|
||||
#define MYSQL_PENDING_CONNECT 1 /* Backend socket pending connect */
|
||||
#define MYSQL_CONNECTED 2 /* Backend socket Connected */
|
||||
#define MYSQL_AUTH_SENT 3 /* Authentication handshake has been sent */
|
||||
#define MYSQL_AUTH_RECV 4 /* Received user, password, db and capabilities */
|
||||
#define MYSQL_AUTH_FAILED 5 /* Auth failed, return error packet */
|
||||
#define MYSQL_IDLE 6 /* Auth done. Protocol is idle, waiting for statements */
|
||||
#define MYSQL_ROUTING 7 /* The received command has been routed to backend(s) */
|
||||
#define MYSQL_WAITING_RESULT 8 /* Waiting for result set */
|
||||
#define MYSQL_SESSION_CHANGE 9 /* Pending session change */
|
||||
|
||||
/* MySQL states for authentication reply */
|
||||
#define MYSQL_FAILED_AUTHENTICATION 1
|
||||
|
@ -325,7 +325,7 @@ int n_connect = 0;
|
||||
client->remote = strdup(inet_ntoa(addr.sin_addr));
|
||||
memcpy(&client->func, &MyObject, sizeof(GWPROTOCOL));
|
||||
client->session = session_alloc(dcb->session->service, client);
|
||||
|
||||
ss_dassert(client->session->state != SESSION_STATE_ALLOC);
|
||||
client->state = DCB_STATE_IDLE;
|
||||
|
||||
/* create the session data for HTTPD */
|
||||
|
@ -89,10 +89,21 @@ static void *newSession(ROUTER *instance, SESSION *session);
|
||||
static void closeSession(ROUTER *instance, void *router_session);
|
||||
static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue);
|
||||
static void diagnostics(ROUTER *instance, DCB *dcb);
|
||||
static void clientReply(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb);
|
||||
static void clientReply(
|
||||
ROUTER *instance,
|
||||
void *router_session,
|
||||
GWBUF *queue,
|
||||
DCB *backend_dcb);
|
||||
|
||||
/** The module object definition */
|
||||
static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, routeQuery, diagnostics, clientReply };
|
||||
static ROUTER_OBJECT MyObject = {
|
||||
createInstance,
|
||||
newSession,
|
||||
closeSession,
|
||||
routeQuery,
|
||||
diagnostics,
|
||||
clientReply
|
||||
};
|
||||
|
||||
static SPINLOCK instlock;
|
||||
static INSTANCE *instances;
|
||||
@ -250,7 +261,7 @@ static void *
|
||||
newSession(ROUTER *instance, SESSION *session)
|
||||
{
|
||||
INSTANCE *inst = (INSTANCE *)instance;
|
||||
CLIENT_SESSION *client;
|
||||
CLIENT_SESSION *client_ses;
|
||||
BACKEND *candidate = NULL;
|
||||
int i;
|
||||
|
||||
@ -263,7 +274,7 @@ int i;
|
||||
inst);
|
||||
|
||||
|
||||
if ((client = (CLIENT_SESSION *)malloc(sizeof(CLIENT_SESSION))) == NULL) {
|
||||
if ((client_ses = (CLIENT_SESSION *)malloc(sizeof(CLIENT_SESSION))) == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
/*
|
||||
@ -335,7 +346,7 @@ int i;
|
||||
"%lu [newSession] Couldn't find eligible candidate "
|
||||
"server. Exiting.",
|
||||
pthread_self());
|
||||
free(client);
|
||||
free(client_ses);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -345,7 +356,7 @@ int i;
|
||||
*/
|
||||
atomic_add(&candidate->current_connection_count, 1);
|
||||
|
||||
client->backend = candidate;
|
||||
client_ses->backend = candidate;
|
||||
|
||||
skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
@ -356,10 +367,10 @@ int i;
|
||||
candidate->current_connection_count);
|
||||
/*
|
||||
* Open a backend connection, putting the DCB for this
|
||||
* connection in the client->dcb
|
||||
* connection in the client_ses->dcb
|
||||
*/
|
||||
|
||||
if ((client->dcb = dcb_connect(candidate->server, session,
|
||||
if ((client_ses->dcb = dcb_connect(candidate->server, session,
|
||||
candidate->server->protocol)) == NULL)
|
||||
{
|
||||
atomic_add(&candidate->current_connection_count, -1);
|
||||
@ -369,7 +380,7 @@ int i;
|
||||
"server in port %d. Exiting.",
|
||||
pthread_self(),
|
||||
candidate->server->port);
|
||||
free(client);
|
||||
free(client_ses);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -377,10 +388,10 @@ int i;
|
||||
|
||||
/* Add this session to the list of active sessions */
|
||||
spinlock_acquire(&inst->lock);
|
||||
client->next = inst->connections;
|
||||
inst->connections = client;
|
||||
client_ses->next = inst->connections;
|
||||
inst->connections = client_ses;
|
||||
spinlock_release(&inst->lock);
|
||||
return (void *)client;
|
||||
return (void *)client_ses;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -394,36 +405,37 @@ static void
|
||||
closeSession(ROUTER *instance, void *router_session)
|
||||
{
|
||||
INSTANCE *inst = (INSTANCE *)instance;
|
||||
CLIENT_SESSION *session = (CLIENT_SESSION *)router_session;
|
||||
bool succp = FALSE;
|
||||
CLIENT_SESSION *client_ses = (CLIENT_SESSION *)router_session;
|
||||
bool succp = FALSE;
|
||||
|
||||
/*
|
||||
* Close the connection to the backend
|
||||
*/
|
||||
skygw_log_write_flush(
|
||||
LOGFILE_TRACE,
|
||||
"%lu [closeSession] closing session with router_session "
|
||||
"%lu [closeSession] closing session with "
|
||||
"router_session "
|
||||
"%p, and inst %p.",
|
||||
pthread_self(),
|
||||
session,
|
||||
client_ses,
|
||||
inst);
|
||||
succp = session->dcb->func.close(session->dcb);
|
||||
succp = client_ses->dcb->func.close(client_ses->dcb);
|
||||
if (succp) {
|
||||
session->dcb = NULL;
|
||||
client_ses->dcb = NULL;
|
||||
}
|
||||
atomic_add(&session->backend->current_connection_count, -1);
|
||||
atomic_add(&session->backend->server->stats.n_current, -1);
|
||||
atomic_add(&client_ses->backend->current_connection_count, -1);
|
||||
atomic_add(&client_ses->backend->server->stats.n_current, -1);
|
||||
|
||||
spinlock_acquire(&inst->lock);
|
||||
if (inst->connections == session)
|
||||
inst->connections = session->next;
|
||||
if (inst->connections == client_ses)
|
||||
inst->connections = client_ses->next;
|
||||
else
|
||||
{
|
||||
CLIENT_SESSION *ptr = inst->connections;
|
||||
while (ptr && ptr->next != session)
|
||||
while (ptr && ptr->next != client_ses)
|
||||
ptr = ptr->next;
|
||||
if (ptr)
|
||||
ptr->next = session->next;
|
||||
ptr->next = client_ses->next;
|
||||
}
|
||||
spinlock_release(&inst->lock);
|
||||
|
||||
@ -432,7 +444,7 @@ bool succp = FALSE;
|
||||
* all the memory and other resources associated
|
||||
* to the client session.
|
||||
*/
|
||||
free(session);
|
||||
free(client_ses);
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user