: dcb_alloc switched malloc to calloc, dcb->fd is initialized to -1. 
: dcb_process_zombies return value of close(fd) is checked. Closed fds are stored to conn_open array. 
: dcb_connect assigns new fd only if backend connection succeeds. Dcb is added to poll set in the same way, only if connect succeed.
gateway.c : conn_open array is initialized in main. For each created socket, a true is set to corresponding slot. Max. number of slots is 1024.
mysql_client_server_protocol.h : gw_do_connect_to declaration 
mysql_backend.c : gw_create_backend_connection returns rv >= 0 and a protocol which is assigned to backend_dcb. In error, -1 is returned and fd is not set.
mysql_client.c : conn_open array is kept up-to-date and protocol pointer is assigned also to dcb outside mysql_protocol_init.
mysql_common.c : gw_do_connect_to_backend 3rd argument is pointer to fd, not protocol. dcb is added to poll set later in dcb_connect.
skygw_debug.h : define conn_open[1024] array where open connections can be marked in debug build.
This commit is contained in:
vraatikka
2013-09-13 22:10:40 +03:00
parent c3fba63b45
commit 710fc5cfa6
7 changed files with 135 additions and 73 deletions

View File

@ -86,7 +86,7 @@ DCB * dcb_alloc(
{ {
DCB *rval; DCB *rval;
if ((rval = malloc(sizeof(DCB))) == NULL) if ((rval = calloc(1, sizeof(DCB))) == NULL)
{ {
return NULL; return NULL;
} }
@ -95,7 +95,6 @@ DCB *rval;
rval->dcb_chk_tail = CHK_NUM_DCB; rval->dcb_chk_tail = CHK_NUM_DCB;
#endif #endif
rval->dcb_role = role; rval->dcb_role = role;
simple_mutex_init(&rval->dcb_write_lock, "DCB write mutex"); simple_mutex_init(&rval->dcb_write_lock, "DCB write mutex");
simple_mutex_init(&rval->dcb_read_lock, "DCB read mutex"); simple_mutex_init(&rval->dcb_read_lock, "DCB read mutex");
rval->dcb_write_active = false; rval->dcb_write_active = false;
@ -104,18 +103,21 @@ DCB *rval;
spinlock_init(&rval->writeqlock); spinlock_init(&rval->writeqlock);
spinlock_init(&rval->delayqlock); spinlock_init(&rval->delayqlock);
spinlock_init(&rval->authlock); spinlock_init(&rval->authlock);
#if 0
rval->writeq = NULL; rval->writeq = NULL;
rval->delayq = NULL; rval->delayq = NULL;
rval->remote = NULL; rval->remote = NULL;
rval->state = DCB_STATE_ALLOC;
rval->next = NULL; rval->next = NULL;
rval->data = NULL; rval->data = NULL;
rval->protocol = NULL; rval->protocol = NULL;
rval->session = NULL; rval->session = NULL;
memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics rval->memdata.next = NULL;
bitmask_init(&rval->memdata.bitmask);
rval->memdata.next = NULL;
rval->command = 0; rval->command = 0;
#endif
rval->fd = -1;
memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics
rval->state = DCB_STATE_ALLOC;
bitmask_init(&rval->memdata.bitmask);
spinlock_acquire(&dcbspin); spinlock_acquire(&dcbspin);
if (allDCBs == NULL) if (allDCBs == NULL)
@ -391,11 +393,38 @@ bool succp = false;
while (dcb != NULL) { while (dcb != NULL) {
DCB* dcb_next = NULL; DCB* dcb_next = NULL;
int rc = 0;
/** /**
* Close file descriptor and move to clean-up phase. * Close file descriptor and move to clean-up phase.
*/ */
close(dcb->fd); rc = close(dcb->fd);
ss_debug(dcb->fd = 0;)
if (rc < 0) {
int eno = errno;
errno = 0;
skygw_log_write_flush(
LOGFILE_ERROR,
"%lu [dcb_process_zombies] Failed to close socket "
"%d on dcb %p due error %d, %s.",
pthread_self(),
dcb->fd,
dcb,
eno,
strerror(eno));
}
#if defined(SS_DEBUG)
else {
skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [dcb_process_zombies] Closed socket "
"%d on dcb %p.",
pthread_self(),
dcb->fd,
dcb);
conn_open[dcb->fd] = false;
ss_debug(dcb->fd = 0;)
}
#endif
succp = dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL); succp = dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL);
ss_dassert(succp); ss_dassert(succp);
dcb_next = dcb->memdata.next; dcb_next = dcb->memdata.next;
@ -422,6 +451,7 @@ dcb_connect(SERVER *server, SESSION *session, const char *protocol)
DCB *dcb; DCB *dcb;
GWPROTOCOL *funcs; GWPROTOCOL *funcs;
int val; int val;
int fd;
if ((dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER)) == NULL) if ((dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER)) == NULL)
{ {
@ -448,20 +478,28 @@ int val;
return NULL; return NULL;
} }
if ((dcb->fd = dcb->func.connect(dcb, server, session)) == -1) fd = dcb->func.connect(dcb, server, session);
{
if (fd == -1) {
skygw_log_write_flush( skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"%lu [dcb_connect] Failed to connect to server %s:%d, " "%lu [dcb_connect] Failed to connect to server %s:%d, "
"from backend dcb %p\n", "from backend dcb %p, client dcp %p fd %d\n",
pthread_self(), pthread_self(),
server->name, server->name,
server->port, server->port,
dcb); dcb,
session->client,
session->client->fd);
dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL); dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL);
dcb_final_free(dcb); dcb_final_free(dcb);
return NULL; return NULL;
} }
ss_dassert(dcb->fd = -1);
/**
* Successfully connected to backend. Assign file descriptor to dcb
*/
dcb->fd = fd;
/* /*
* The dcb will be addded into poll set by dcb->func.connect * The dcb will be addded into poll set by dcb->func.connect
@ -469,11 +507,18 @@ int val;
atomic_add(&server->stats.n_connections, 1); atomic_add(&server->stats.n_connections, 1);
atomic_add(&server->stats.n_current, 1); atomic_add(&server->stats.n_current, 1);
/* /**
* We are now connected, the authentication etc will happen as * backend_dcb is connected to backend server, and once backend_dcb
* part of the EPOLLOUT event that will be received once the connection * is added to poll set, authentication takes place as part of
* EPOLLOUT event that will be received once the connection
* is established. * is established.
*/ */
/**
* Add the dcb in the poll set
*/
poll_add_dcb(dcb);
return dcb; return dcb;
} }

View File

@ -210,6 +210,9 @@ void* log_flush_thr = NULL;
ssize_t log_flush_timeout_ms = 0; ssize_t log_flush_timeout_ms = 0;
int l; int l;
#if defined(SS_DEBUG)
memset(conn_open, 0, sizeof(bool)*1024);
#endif
l = atexit(skygw_logmanager_exit); l = atexit(skygw_logmanager_exit);
if (l != 0) { if (l != 0) {

View File

@ -251,7 +251,7 @@ int gw_send_authentication_to_backend(
uint8_t *passwd, uint8_t *passwd,
MySQLProtocol *conn); MySQLProtocol *conn);
const char *gw_mysql_protocol_state2string(int state); const char *gw_mysql_protocol_state2string(int state);
int gw_do_connect_to_backend(char *host, int port, MySQLProtocol *conn); int gw_do_connect_to_backend(char *host, int port, int* fd);
int mysql_send_custom_error ( int mysql_send_custom_error (
DCB *dcb, DCB *dcb,
int packet_number, int packet_number,

View File

@ -486,10 +486,13 @@ static int gw_error_backend_event(DCB *dcb) {
* This routine will connect to a backend server and it is called by dbc_connect * This routine will connect to a backend server and it is called by dbc_connect
* in router->newSession * in router->newSession
* *
* @param backend The Backend DCB allocated from dcb_connect * @param backend_dcb, in, out, use - backend DCB allocated from dcb_connect
* @param server The selected server to connect to * @param server, in, use - server to connect to
* @param session The current session from Client DCB * @param session, in use - current session from client DCB
* @return 0 on Success or 1 on Failure. * @return 0/1 on Success and -1 on Failure.
* If succesful, returns positive fd to socket which is connected to
* backend server. Positive fd is copied to protocol and to dcb.
* If fails, fd == -1 and socket is closed.
*/ */
static int gw_create_backend_connection( static int gw_create_backend_connection(
DCB *backend_dcb, DCB *backend_dcb,
@ -507,66 +510,65 @@ static int gw_create_backend_connection(
skygw_log_write_flush( skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"%lu [gw_create_backend_connection] Failed to create " "%lu [gw_create_backend_connection] Failed to create "
"protocol object for back-end connection.", "protocol object for backend connection.",
pthread_self()); pthread_self());
goto return_fd; goto return_fd;
} }
rv = gw_do_connect_to_backend(server->name, server->port, protocol);
/**
* We could also move later, this in to the gw_do_connect_to_backend
* using protocol->descriptor
* NOTE that protocol->fd can be -1 too. Not sure if it is necessary.
*/
backend_dcb->fd = protocol->fd;
fd = backend_dcb->fd;
/** if succeed, fd > 0, -1 otherwise */
rv = gw_do_connect_to_backend(server->name, server->port, &fd);
/** Assign fd with protocol */
protocol->fd = fd;
/** Assign protocol with backend_dcb */
backend_dcb->protocol = protocol;
/** Set protocol state */
switch (rv) { switch (rv) {
case 0: case 0:
ss_dassert(backend_dcb->fd > 0); ss_dassert(fd > 0);
protocol->state = MYSQL_CONNECTED; protocol->state = MYSQL_CONNECTED;
skygw_log_write( skygw_log_write(
LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [gw_create_backend_connection] Established " "%lu [gw_create_backend_connection] Established "
"connection to %s:%i, backend fd %d client " "connection to %s:%i, protocol fd %d client "
"fd %d.", "fd %d.",
pthread_self(), pthread_self(),
server->name, server->name,
server->port, server->port,
backend_dcb->fd, protocol->fd,
session->client->fd); session->client->fd);
break; break;
case 1: case 1:
ss_dassert(backend_dcb->fd > 0); ss_dassert(fd > 0);
protocol->state = MYSQL_PENDING_CONNECT; protocol->state = MYSQL_PENDING_CONNECT;
skygw_log_write( skygw_log_write(
LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [gw_create_backend_connection] Connection " "%lu [gw_create_backend_connection] Connection "
"pending to %s:%i, backend fd %d client fd %d.", "pending to %s:%i, protocol fd %d client fd %d.",
pthread_self(), pthread_self(),
server->name, server->name,
server->port, server->port,
backend_dcb->fd, protocol->fd,
session->client->fd); session->client->fd);
break; break;
default: default:
ss_dassert(backend_dcb->fd == -1); ss_dassert(fd == -1);
ss_dassert(protocol->state == MYSQL_ALLOC); ss_dassert(protocol->state == MYSQL_ALLOC);
skygw_log_write( skygw_log_write(
LOGFILE_ERROR, LOGFILE_ERROR,
"%lu [gw_create_backend_connection] Connection " "%lu [gw_create_backend_connection] Connection "
"failed to %s:%i, backend fd %d client fd %d.", "failed to %s:%i, protocol fd %d client fd %d.",
pthread_self(), pthread_self(),
server->name, server->name,
server->port, server->port,
backend_dcb->fd, protocol->fd,
session->client->fd); session->client->fd);
break; break;
} /**< switch */ } /**< switch */
return_fd: return_fd:
ss_dassert(backend_dcb->fd == fd);
ss_dassert(backend_dcb->fd == protocol->fd);
return fd; return fd;
} }

View File

@ -826,6 +826,9 @@ int gw_MySQLListener(
strerror(errno)); strerror(errno));
return 0; return 0;
} }
#if defined(SS_DEBUG)
conn_open[l_so] = true;
#endif
listen_dcb->func.accept = gw_MySQLAccept; listen_dcb->func.accept = gw_MySQLAccept;
return 1; return 1;
@ -865,7 +868,6 @@ int gw_MySQLAccept(DCB *listener)
} }
else if (eno == ENFILE) else if (eno == ENFILE)
{ {
/** /**
* Exceeded system's max. number of files limit. * Exceeded system's max. number of files limit.
*/ */
@ -880,7 +882,6 @@ int gw_MySQLAccept(DCB *listener)
} }
else if (eno == EMFILE) else if (eno == EMFILE)
{ {
/** /**
* Exceeded processes max. number of files limit. * Exceeded processes max. number of files limit.
*/ */
@ -893,7 +894,6 @@ int gw_MySQLAccept(DCB *listener)
usleep(100*i*i++); usleep(100*i*i++);
goto retry_accept; goto retry_accept;
} }
else else
{ {
/** /**
@ -913,7 +913,16 @@ int gw_MySQLAccept(DCB *listener)
i = 0; i = 0;
listener->stats.n_accepts++; listener->stats.n_accepts++;
#if defined(SS_DEBUG)
if (c_sock > 0) {
skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [gw_MySQLAccept] Accepted fd %d.",
pthread_self(),
c_sock);
conn_open[c_sock] = true;
}
#endif
fprintf(stderr, fprintf(stderr,
"Processing %i connection fd %i for listener %i\n", "Processing %i connection fd %i for listener %i\n",
listener->stats.n_accepts, listener->stats.n_accepts,
@ -940,6 +949,7 @@ int gw_MySQLAccept(DCB *listener)
pthread_self()); pthread_self());
return 1; return 1;
} }
client_dcb->protocol = protocol;
// assign function poiters to "func" field // assign function poiters to "func" field
memcpy(&client_dcb->func, &MyObject, sizeof(GWPROTOCOL)); memcpy(&client_dcb->func, &MyObject, sizeof(GWPROTOCOL));
//send handshake to the client_dcb //send handshake to the client_dcb

View File

@ -49,7 +49,8 @@ extern int gw_error_backend_event(DCB *dcb);
* @return * @return
* *
* *
* @details (write detailed description here) * @details Protocol structure does not have fd because dcb is not
* connected yet.
* *
*/ */
MySQLProtocol* mysql_protocol_init( MySQLProtocol* mysql_protocol_init(
@ -57,16 +58,6 @@ MySQLProtocol* mysql_protocol_init(
{ {
MySQLProtocol* p; MySQLProtocol* p;
CHK_DCB(dcb);
if (dcb == NULL) {
skygw_log_write_flush(
LOGFILE_ERROR,
"%lu [mysql_init_protocol] MySQL protocol init failed : "
"called with DCB == NULL.",
pthread_self());
return NULL;
}
p = (MySQLProtocol *) calloc(1, sizeof(MySQLProtocol)); p = (MySQLProtocol *) calloc(1, sizeof(MySQLProtocol));
ss_dassert(p != NULL); ss_dassert(p != NULL);
@ -87,9 +78,7 @@ MySQLProtocol* mysql_protocol_init(
p->protocol_chk_top = CHK_NUM_PROTOCOL; p->protocol_chk_top = CHK_NUM_PROTOCOL;
p->protocol_chk_tail = CHK_NUM_PROTOCOL; p->protocol_chk_tail = CHK_NUM_PROTOCOL;
#endif #endif
p->fd = dcb->fd;
p->owner_dcb = dcb; p->owner_dcb = dcb;
dcb->protocol = p;
CHK_PROTOCOL(p); CHK_PROTOCOL(p);
return_p: return_p:
return p; return p;
@ -528,30 +517,29 @@ int gw_send_authentication_to_backend(char *dbname, char *user, uint8_t *passwd,
/** /**
* gw_do_connect_to_backend * gw_do_connect_to_backend
* *
* This routine connects to a backend server using connect() in NON_BLOCKING mode * This routine creates socket and connects to a backend server.
* Connect it non-blocking operation. If connect fails, socket is closed.
* *
* @param host The host to connect to * @param host The host to connect to
* @param port The host TCP/IP port * @param port The host TCP/IP port
* @param conn The MySQLProtocol structure to fill * @param *fd where connected fd is copied
* @return 0 on success and !=0 on failure * @return 0/1 on success and -1 on failure
* If succesful, fd has file descriptor to socket which is connected to
* backend server. In failure, fd == -1 and socket is closed.
* *
*/ */
int gw_do_connect_to_backend( int gw_do_connect_to_backend(
char *host, char *host,
int port, int port,
MySQLProtocol *conn) int* fd)
{ {
struct sockaddr_in serv_addr; struct sockaddr_in serv_addr;
int rv; int rv;
int so = 0; int so = 0;
DCB* dcb = conn->owner_dcb;
CHK_DCB(dcb);
memset(&serv_addr, 0, sizeof serv_addr); memset(&serv_addr, 0, sizeof serv_addr);
serv_addr.sin_family = AF_INET; serv_addr.sin_family = AF_INET;
so = socket(AF_INET,SOCK_STREAM,0); so = socket(AF_INET,SOCK_STREAM,0);
conn->fd = so;
if (so < 0) { if (so < 0) {
int eno = errno; int eno = errno;
@ -567,8 +555,6 @@ int gw_do_connect_to_backend(
rv = -1; rv = -1;
goto return_rv; goto return_rv;
} }
/* Assign so to the caller dcb, conn->owner_dcb */
dcb->fd = so;
/* prepare for connect */ /* prepare for connect */
setipaddress(&serv_addr.sin_addr, host); setipaddress(&serv_addr.sin_addr, host);
serv_addr.sin_port = htons(port); serv_addr.sin_port = htons(port);
@ -593,12 +579,23 @@ int gw_do_connect_to_backend(
port, port,
eno, eno,
strerror(eno)); strerror(eno));
/** Close newly created socket. */
close(so);
goto return_rv;
} }
} }
/** *fd = so;
* Add the dcb in the poll set skygw_log_write_flush(
*/ LOGFILE_TRACE,
poll_add_dcb(dcb); "%lu [gw_do_connect_to_backend] Connected to backend server "
"%s:%d, fd %d.",
pthread_self(),
host,
port,
so);
#if defined(SS_DEBUG)
conn_open[so] = true;
#endif
return_rv: return_rv:
return rv; return rv;
} }

View File

@ -22,6 +22,7 @@
#define __USE_UNIX98 1 #define __USE_UNIX98 1
#include <pthread.h> #include <pthread.h>
#include <unistd.h> #include <unistd.h>
#include <stdbool.h>
#if !defined(SKYGW_DEBUG_H) #if !defined(SKYGW_DEBUG_H)
#define SKYGW_DEBUG_H #define SKYGW_DEBUG_H
@ -359,4 +360,8 @@ typedef enum skygw_chk_t {
"Session under- or overflow"); \ "Session under- or overflow"); \
} }
#if defined(SS_DEBUG)
bool conn_open[1024];
#endif
#endif /* SKYGW_DEBUG_H */ #endif /* SKYGW_DEBUG_H */