Replaced oinlined protocol creation with call to mysql_protocol_init. Clean up.

This commit is contained in:
vraatikka
2013-09-01 00:36:31 +03:00
parent 9df2040a8a
commit 757a043386

View File

@ -509,7 +509,9 @@ int gw_read_client_event(DCB* dcb) {
MySQLProtocol *protocol = NULL; MySQLProtocol *protocol = NULL;
int b = -1; int b = -1;
int rc = 0; int rc = 0;
#if 0
dcb->state = dcb_begin_action(dcb, DCB_ACTION_READ);
#else
CHK_DCB(dcb); CHK_DCB(dcb);
if (dcb->state == DCB_STATE_DISCONNECTED || if (dcb->state == DCB_STATE_DISCONNECTED ||
@ -522,7 +524,7 @@ int gw_read_client_event(DCB* dcb) {
} }
ss_dassert(dcb->state == DCB_STATE_POLLING); ss_dassert(dcb->state == DCB_STATE_POLLING);
dcb->state = DCB_STATE_PROCESSING; dcb->state = DCB_STATE_PROCESSING;
#endif
protocol = DCB_PROTOCOL(dcb, MySQLProtocol); protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
CHK_PROTOCOL(protocol); CHK_PROTOCOL(protocol);
/** /**
@ -540,8 +542,6 @@ int gw_read_client_event(DCB* dcb) {
eno, eno,
strerror(eno), strerror(eno),
dcb->state); dcb->state);
dcb->state = DCB_STATE_POLLING;
rc = 1; rc = 1;
goto return_rc; goto return_rc;
} else { } else {
@ -567,7 +567,6 @@ int gw_read_client_event(DCB* dcb) {
rc = gw_read_gwbuff(dcb, &gw_buffer, b); rc = gw_read_gwbuff(dcb, &gw_buffer, b);
if (rc != 0) { if (rc != 0) {
dcb->state = DCB_STATE_POLLING;
goto return_rc; goto return_rc;
} }
@ -635,7 +634,6 @@ int gw_read_client_event(DCB* dcb) {
rc = gw_read_gwbuff(dcb, &gw_buffer, b); rc = gw_read_gwbuff(dcb, &gw_buffer, b);
if (rc != 0) { if (rc != 0) {
dcb->state = DCB_STATE_POLLING;
goto return_rc; goto return_rc;
} }
/* Now, we are assuming in the first buffer there is /* Now, we are assuming in the first buffer there is
@ -662,9 +660,6 @@ int gw_read_client_event(DCB* dcb) {
/* fprintf(stderr, "COM_QUIT received with /* fprintf(stderr, "COM_QUIT received with
* no connected backends from %i\n", dcb->fd); */ * no connected backends from %i\n", dcb->fd); */
(dcb->func).close(dcb); (dcb->func).close(dcb);
dcb->state = DCB_STATE_POLLING;
rc = 1;
goto return_rc;
} else { } else {
/* Send a custom error as MySQL command reply */ /* Send a custom error as MySQL command reply */
mysql_send_custom_error( mysql_send_custom_error(
@ -673,10 +668,9 @@ int gw_read_client_event(DCB* dcb) {
0, 0,
"Connection to backend lost"); "Connection to backend lost");
protocol->state = MYSQL_IDLE; protocol->state = MYSQL_IDLE;
dcb->state = DCB_STATE_POLLING;
rc = 1;
goto return_rc;
} }
rc = 1;
goto return_rc;
} }
/* We can route the query */ /* We can route the query */
/* COM_QUIT handling */ /* COM_QUIT handling */
@ -701,7 +695,7 @@ int gw_read_client_event(DCB* dcb) {
//fprintf(stderr, "<<< Routing the Query ...\n"); //fprintf(stderr, "<<< Routing the Query ...\n");
router->routeQuery(router_instance, router->routeQuery(router_instance,
rsession, rsession,
queue); queue);
protocol->state = MYSQL_WAITING_RESULT; protocol->state = MYSQL_WAITING_RESULT;
} }
break; break;
@ -742,15 +736,13 @@ int gw_write_client_event(DCB *dcb) {
return 1; return 1;
} }
if ((protocol->state == MYSQL_IDLE) || (protocol->state == MYSQL_WAITING_RESULT)) { if (protocol->state == MYSQL_IDLE ||
int w; protocol->state == MYSQL_WAITING_RESULT)
{
w = dcb_drain_writeq(dcb); dcb_drain_writeq(dcb);
dcb->state = DCB_STATE_POLLING; dcb->state = DCB_STATE_POLLING;
return 1; return 1;
} }
dcb->state = DCB_STATE_POLLING; dcb->state = DCB_STATE_POLLING;
return 1; return 1;
} }
@ -763,19 +755,21 @@ int gw_MySQLListener(DCB *listener, char *config_bind) {
struct sockaddr_in serv_addr; struct sockaddr_in serv_addr;
char *bind_address_and_port = NULL; char *bind_address_and_port = NULL;
char *p; char *p;
char address[1024]=""; char address[1024] = "";
int port=0; int port=0;
int one = 1; int one = 1;
// this gateway, as default, will bind on port 4404 for localhost only // this gateway, as default, will bind on port 4404 for localhost only
(config_bind != NULL) ? (bind_address_and_port = config_bind) : (bind_address_and_port = "127.0.0.1:4406"); if (config_bind != NULL) {
bind_address_and_port = config_bind;
} else {
bind_address_and_port = "127.0.0.1:4406";
}
listener->fd = -1; listener->fd = -1;
memset(&serv_addr, 0, sizeof serv_addr); memset(&serv_addr, 0, sizeof serv_addr);
serv_addr.sin_family = AF_INET; serv_addr.sin_family = AF_INET;
p = strchr(bind_address_and_port, ':'); p = strchr(bind_address_and_port, ':');
if (p) { if (p) {
strncpy(address, bind_address_and_port, sizeof(address)); strncpy(address, bind_address_and_port, sizeof(address));
address[sizeof(address)-1] = '\0'; address[sizeof(address)-1] = '\0';
@ -783,22 +777,25 @@ int gw_MySQLListener(DCB *listener, char *config_bind) {
*p = '\0'; *p = '\0';
port = atoi(p+1); port = atoi(p+1);
setipaddress(&serv_addr.sin_addr, address); setipaddress(&serv_addr.sin_addr, address);
snprintf(address, (sizeof(address) - 1), "%s", inet_ntoa(serv_addr.sin_addr)); snprintf(address,
(sizeof(address) - 1),
"%s",
inet_ntoa(serv_addr.sin_addr));
} else { } else {
port = atoi(bind_address_and_port); port = atoi(bind_address_and_port);
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
sprintf(address, "0.0.0.0"); sprintf(address, "0.0.0.0");
} }
serv_addr.sin_port = htons(port); serv_addr.sin_port = htons(port);
// socket create // socket create
if ((l_so = socket(AF_INET, SOCK_STREAM, 0)) < 0) { if ((l_so = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
fprintf(stderr, ">>> Error: can't open listening socket. Errno %i, %s\n", errno, strerror(errno)); fprintf(stderr,
">>> Error: can't open listening socket. Errno %i, %s\n",
errno, strerror(errno));
return 0; return 0;
} }
// socket options // socket options
setsockopt(l_so, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); setsockopt(l_so, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one));
@ -807,30 +804,37 @@ int gw_MySQLListener(DCB *listener, char *config_bind) {
// bind address and port // bind address and port
if (bind(l_so, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { if (bind(l_so, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
fprintf(stderr, ">>> Bind failed !!! %i, [%s]\n", errno, strerror(errno)); fprintf(stderr,
">>> Bind failed !!! %i, [%s]\n",
errno,
strerror(errno));
fprintf(stderr, ">>> can't bind to address and port"); fprintf(stderr, ">>> can't bind to address and port");
return 0; return 0;
} }
fprintf(stderr,
fprintf(stderr, ">> GATEWAY bind is: %s:%i. FD is %i\n", address, port, l_so); ">> GATEWAY bind is: %s:%i. FD is %i\n",
address,
port,
l_so);
listen(l_so, 10 * SOMAXCONN); listen(l_so, 10 * SOMAXCONN);
fprintf(stderr,
fprintf(stderr, ">> GATEWAY listen backlog queue is %i\n", 10 * SOMAXCONN); ">> GATEWAY listen backlog queue is %i\n",
10 * SOMAXCONN);
listener->state = DCB_STATE_IDLE; listener->state = DCB_STATE_IDLE;
// assign l_so to dcb // assign l_so to dcb
listener->fd = l_so; listener->fd = l_so;
// add listening socket to poll structure // add listening socket to poll structure
if (poll_add_dcb(listener) == -1) { if (poll_add_dcb(listener) == -1) {
fprintf(stderr, ">>> poll_add_dcb: can't add the listen_sock! Errno %i, %s\n", errno, strerror(errno)); fprintf(stderr,
">>> poll_add_dcb: can't add the listen_sock! Errno "
"%i, %s\n",
errno,
strerror(errno));
return 0; return 0;
} }
listener->func.accept = gw_MySQLAccept; listener->func.accept = gw_MySQLAccept;
listener->state = DCB_STATE_LISTENING; listener->state = DCB_STATE_LISTENING;
return 1; return 1;
@ -845,8 +849,8 @@ int gw_MySQLAccept(DCB *listener) {
int c_sock; int c_sock;
struct sockaddr_in local; struct sockaddr_in local;
socklen_t addrlen = sizeof(struct sockaddr_in); socklen_t addrlen = sizeof(struct sockaddr_in);
DCB *client_dcb; DCB *client_dcb;
MySQLProtocol *protocol; MySQLProtocol *protocol;
int sendbuf = GW_BACKEND_SO_SNDBUF; int sendbuf = GW_BACKEND_SO_SNDBUF;
socklen_t optlen = sizeof(sendbuf); socklen_t optlen = sizeof(sendbuf);
@ -870,7 +874,11 @@ int gw_MySQLAccept(DCB *listener) {
listener->stats.n_accepts++; listener->stats.n_accepts++;
fprintf(stderr, "Processing %i connection fd %i for listener %i\n", listener->stats.n_accepts, c_sock, listener->fd); fprintf(stderr,
"Processing %i connection fd %i for listener %i\n",
listener->stats.n_accepts,
c_sock,
listener->fd);
// set nonblocking // set nonblocking
setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen); setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen);
@ -880,16 +888,17 @@ int gw_MySQLAccept(DCB *listener) {
client_dcb->service = listener->session->service; client_dcb->service = listener->session->service;
client_dcb->fd = c_sock; client_dcb->fd = c_sock;
client_dcb->remote = strdup(inet_ntoa(local.sin_addr)); client_dcb->remote = strdup(inet_ntoa(local.sin_addr));
protocol = mysql_protocol_init(client_dcb);
protocol = (MySQLProtocol *) calloc(1, sizeof(MySQLProtocol)); if (protocol == NULL) {
protocol->protocol_chk_top = CHK_NUM_PROTOCOL; skygw_log_write_flush(
protocol->protocol_chk_tail = CHK_NUM_PROTOCOL; LOGFILE_ERROR,
"%lu [gw_MySQLAccept] Failed to create "
client_dcb->protocol = (void *)protocol; "protocol object for client connection.",
protocol->state = MYSQL_ALLOC; pthread_self());
protocol->descriptor = client_dcb; return 1;
protocol->fd = c_sock; }
// assign function poiters to "func" field // assign function poiters to "func" field
memcpy(&client_dcb->func, &MyObject, sizeof(GWPROTOCOL)); memcpy(&client_dcb->func, &MyObject, sizeof(GWPROTOCOL));