Merge branch 'release-1.3.0' into develop

This commit is contained in:
Markus Makela
2016-02-11 10:15:36 +02:00
15 changed files with 352 additions and 373 deletions

View File

@ -51,12 +51,11 @@
*/
#include <modinfo.h>
MODULE_INFO info =
{
MODULE_API_PROTOCOL,
MODULE_GA,
GWPROTOCOL_VERSION,
"The MySQL to backend server protocol"
MODULE_INFO info = {
MODULE_API_PROTOCOL,
MODULE_GA,
GWPROTOCOL_VERSION,
"The MySQL to backend server protocol"
};
static char *version_str = "V2.0.0";
@ -70,8 +69,8 @@ static int gw_backend_hangup(DCB *dcb);
static int backend_write_delayqueue(DCB *dcb);
static void backend_set_delayqueue(DCB *dcb, GWBUF *queue);
static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue);
static GWBUF* process_response_data (DCB* dcb, GWBUF* readbuf, int nbytes_to_process);
extern char* create_auth_failed_msg( GWBUF* readbuf, char* hostaddr, uint8_t* sha1);
static GWBUF* process_response_data(DCB* dcb, GWBUF* readbuf, int nbytes_to_process);
extern char* create_auth_failed_msg(GWBUF* readbuf, char* hostaddr, uint8_t* sha1);
extern char* create_auth_fail_str(char *username, char *hostaddr, char *sha1, char *db, int errcode);
static bool sescmd_response_complete(DCB* dcb);
@ -79,21 +78,20 @@ static bool sescmd_response_complete(DCB* dcb);
#if defined(NOT_USED)
static int gw_session(DCB *backend_dcb, void *data);
#endif
static MYSQL_session* gw_get_shared_session_auth_info(DCB* dcb);
static bool gw_get_shared_session_auth_info(DCB* dcb, MYSQL_session* session);
static GWPROTOCOL MyObject =
{
gw_read_backend_event, /* Read - EPOLLIN handler */
gw_MySQLWrite_backend, /* Write - data from gateway */
gw_write_backend_event, /* WriteReady - EPOLLOUT handler */
gw_error_backend_event, /* Error - EPOLLERR handler */
gw_backend_hangup, /* HangUp - EPOLLHUP handler */
NULL, /* Accept */
gw_create_backend_connection, /* Connect */
gw_backend_close, /* Close */
NULL, /* Listen */
gw_change_user, /* Authentication */
NULL /* Session */
static GWPROTOCOL MyObject = {
gw_read_backend_event, /* Read - EPOLLIN handler */
gw_MySQLWrite_backend, /* Write - data from gateway */
gw_write_backend_event, /* WriteReady - EPOLLOUT handler */
gw_error_backend_event, /* Error - EPOLLERR handler */
gw_backend_hangup, /* HangUp - EPOLLHUP handler */
NULL, /* Accept */
gw_create_backend_connection, /* Connect */
gw_backend_close, /* Close */
NULL, /* Listen */
gw_change_user, /* Authentication */
NULL /* Session */
};
/*
@ -127,28 +125,34 @@ GWPROTOCOL* GetModuleObject()
return &MyObject;
}
static MYSQL_session* gw_get_shared_session_auth_info(DCB* dcb)
/**
* Copy shared session authentication info
*
* @param dcb A backend DCB
* @param session Destination where authentication data is copied
*/
static bool gw_get_shared_session_auth_info(DCB* dcb, MYSQL_session* session)
{
MYSQL_session* auth_info = NULL;
bool rval = true;
CHK_DCB(dcb);
CHK_SESSION(dcb->session);
spinlock_acquire(&dcb->session->ses_lock);
if (dcb->session->state != SESSION_STATE_ALLOC && dcb->session->state != SESSION_STATE_DUMMY)
if (dcb->session->state != SESSION_STATE_ALLOC &&
dcb->session->state != SESSION_STATE_DUMMY)
{
auth_info = dcb->session->data;
memcpy(session, dcb->session->data, sizeof(MYSQL_session));
}
else
{
MXS_ERROR("%lu [gw_get_shared_session_auth_info] Couldn't get "
"session authentication info. Session in a wrong state %d.",
pthread_self(),
dcb->session->state);
pthread_self(), dcb->session->state);
rval = false;
}
spinlock_release(&dcb->session->ses_lock);
return auth_info;
return rval;
}
/**
@ -160,7 +164,7 @@ static int gw_read_backend_event(DCB *dcb)
{
MySQLProtocol *client_protocol = NULL;
MySQLProtocol *backend_protocol = NULL;
MYSQL_session *current_session = NULL;
MYSQL_session local_session;
int rc = 0;
CHK_DCB(dcb);
@ -178,8 +182,10 @@ static int gw_read_backend_event(DCB *dcb)
CHK_SESSION(dcb->session);
/*< return only with complete session */
current_session = gw_get_shared_session_auth_info(dcb);
ss_dassert(current_session != NULL);
if (!gw_get_shared_session_auth_info(dcb, &local_session))
{
goto return_rc;
}
backend_protocol = (MySQLProtocol *) dcb->protocol;
CHK_PROTOCOL(backend_protocol);
@ -229,9 +235,10 @@ static int gw_read_backend_event(DCB *dcb)
* Decode password and send the auth credentials
* to backend.
*/
if (gw_send_authentication_to_backend(current_session->db,
current_session->user,
current_session->client_sha1,
if (gw_send_authentication_to_backend(
local_session.db,
local_session.user,
local_session.client_sha1,
backend_protocol) != 0)
{
backend_protocol->protocol_auth_state = MYSQL_AUTH_FAILED;
@ -293,43 +300,43 @@ static int gw_read_backend_event(DCB *dcb)
switch (receive_rc)
{
case -1:
backend_protocol->protocol_auth_state = MYSQL_AUTH_FAILED;
MXS_DEBUG("%lu [gw_read_backend_event] after "
"gw_receive_backend_authentication "
"fd %d, state = MYSQL_AUTH_FAILED.",
pthread_self(),
backend_protocol->owner_dcb->fd);
case -1:
backend_protocol->protocol_auth_state = MYSQL_AUTH_FAILED;
MXS_DEBUG("%lu [gw_read_backend_event] after "
"gw_receive_backend_authentication "
"fd %d, state = MYSQL_AUTH_FAILED.",
pthread_self(),
backend_protocol->owner_dcb->fd);
MXS_ERROR("Backend server didn't "
"accept authentication for user "
"%s.",
current_session->user);
break;
case 1:
backend_protocol->protocol_auth_state = MYSQL_IDLE;
MXS_ERROR("Backend server didn't "
"accept authentication for user "
"%s.",
local_session.user);
break;
case 1:
backend_protocol->protocol_auth_state = MYSQL_IDLE;
MXS_DEBUG("%lu [gw_read_backend_event] "
"gw_receive_backend_auth succeed. "
"dcb %p fd %d, user %s.",
pthread_self(),
dcb,
dcb->fd,
current_session->user);
break;
default:
ss_dassert(receive_rc == 0);
MXS_DEBUG("%lu [gw_read_backend_event] "
"gw_receive_backend_auth read "
"successfully "
"nothing. dcb %p fd %d, user %s.",
pthread_self(),
dcb,
dcb->fd,
current_session->user);
rc = 0;
goto return_with_lock;
break;
MXS_DEBUG("%lu [gw_read_backend_event] "
"gw_receive_backend_auth succeed. "
"dcb %p fd %d, user %s.",
pthread_self(),
dcb,
dcb->fd,
local_session.user);
break;
default:
ss_dassert(receive_rc == 0);
MXS_DEBUG("%lu [gw_read_backend_event] "
"gw_receive_backend_auth read "
"successfully "
"nothing. dcb %p fd %d, user %s.",
pthread_self(),
dcb,
dcb->fd,
local_session.user);
rc = 0;
goto return_with_lock;
break;
} /* switch */
}
@ -338,9 +345,9 @@ static int gw_read_backend_event(DCB *dcb)
{
GWBUF* errbuf;
bool succp;
/**
* protocol state won't change anymore,
* lock can be freed
/**
* protocol state won't change anymore,
* lock can be freed
*/
spinlock_release(&dcb->authlock);
spinlock_acquire(&dcb->delayqlock);
@ -359,7 +366,7 @@ static int gw_read_backend_event(DCB *dcb)
* client session is not stopping. It is possible that authentication
* fails because the client has closed the connection before all
* backends have done authentication. */
if (backend_protocol->protocol_auth_state == MYSQL_AUTH_FAILED &&
if (backend_protocol->protocol_auth_state == MYSQL_AUTH_FAILED &&
dcb->session->state != SESSION_STATE_STOPPING)
{
service_refresh_users(dcb->session->service);
@ -416,7 +423,7 @@ static int gw_read_backend_event(DCB *dcb)
"user %s.",
pthread_self(),
dcb->fd,
current_session->user);
local_session.user);
/* check the delay queue and flush the data */
if (dcb->delayq)
@ -429,7 +436,7 @@ static int gw_read_backend_event(DCB *dcb)
spinlock_release(&dcb->authlock);
} /* MYSQL_AUTH_RECV || MYSQL_AUTH_FAILED || MYSQL_HANDSHAKE_FAILED */
} /* MYSQL_AUTH_RECV || MYSQL_AUTH_FAILED || MYSQL_HANDSHAKE_FAILED */
/* reading MySQL command output from backend and writing to the client */
{
@ -487,7 +494,7 @@ static int gw_read_backend_event(DCB *dcb)
if (dcb->dcb_readqueue)
{
read_buffer = gwbuf_append(dcb->dcb_readqueue,read_buffer);
read_buffer = gwbuf_append(dcb->dcb_readqueue, read_buffer);
}
nbytes_read = gwbuf_length(read_buffer);
@ -518,7 +525,7 @@ static int gw_read_backend_event(DCB *dcb)
* If protocol has session command set, concatenate whole
* response into one buffer.
*/
if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, false) != MYSQL_COM_UNDEFINED)
if (protocol_get_srv_command((MySQLProtocol *) dcb->protocol, false) != MYSQL_COM_UNDEFINED)
{
read_buffer = process_response_data(dcb, read_buffer, gwbuf_length(read_buffer));
/**
@ -537,7 +544,7 @@ static int gw_read_backend_event(DCB *dcb)
"Read buffer unexpectedly null, even though response "
"not marked as complete. User: %s",
pthread_self(),
current_session->user);
local_session.user);
rc = 0;
goto return_rc;
}
@ -610,7 +617,7 @@ static int gw_write_backend_event(DCB *dcb)
if (dcb->writeq != NULL)
{
data = (uint8_t *)GWBUF_DATA(dcb->writeq);
data = (uint8_t *) GWBUF_DATA(dcb->writeq);
if (dcb->session->client == NULL)
{
@ -691,30 +698,30 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
*/
switch (backend_protocol->protocol_auth_state)
{
case MYSQL_HANDSHAKE_FAILED:
case MYSQL_AUTH_FAILED:
if (dcb->session->state != SESSION_STATE_STOPPING)
{
MXS_ERROR("Unable to write to backend '%s' due to "
"%s failure. Server in state %s.",
dcb->server->unique_name,
backend_protocol->protocol_auth_state == MYSQL_HANDSHAKE_FAILED ?
"handshake" : "authentication",
STRSRVSTATUS(dcb->server));
}
/** Consume query buffer */
while ((queue = gwbuf_consume(
queue,
GWBUF_LENGTH(queue))) != NULL)
{
;
}
rc = 0;
spinlock_release(&dcb->authlock);
goto return_rc;
break;
case MYSQL_HANDSHAKE_FAILED:
case MYSQL_AUTH_FAILED:
if (dcb->session->state != SESSION_STATE_STOPPING)
{
MXS_ERROR("Unable to write to backend '%s' due to "
"%s failure. Server in state %s.",
dcb->server->unique_name,
backend_protocol->protocol_auth_state == MYSQL_HANDSHAKE_FAILED ?
"handshake" : "authentication",
STRSRVSTATUS(dcb->server));
}
/** Consume query buffer */
while ((queue = gwbuf_consume(
queue,
GWBUF_LENGTH(queue))) != NULL)
{
;
}
rc = 0;
spinlock_release(&dcb->authlock);
goto return_rc;
break;
case MYSQL_IDLE:
case MYSQL_IDLE:
{
uint8_t* ptr = GWBUF_DATA(queue);
int cmd = MYSQL_GET_COMMAND(ptr);
@ -744,9 +751,9 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
rc = dcb_write(dcb, queue);
goto return_rc;
}
break;
break;
default:
default:
{
MXS_DEBUG("%lu [gw_MySQLWrite_backend] delayed write to "
"dcb %p fd %d protocol state %s.",
@ -776,7 +783,7 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
rc = 1;
goto return_rc;
}
break;
break;
}
return_rc:
return rc;
@ -823,7 +830,7 @@ static int gw_error_backend_event(DCB *dcb)
len = sizeof(error);
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *)&len) == 0)
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *) & len) == 0)
{
if (error != 0)
{
@ -861,7 +868,7 @@ static int gw_error_backend_event(DCB *dcb)
int error, len;
len = sizeof(error);
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *)&len) == 0)
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *) & len) == 0)
{
if (error != 0)
{
@ -914,8 +921,8 @@ retblock:
* backend server. Positive fd is copied to protocol and to dcb.
* If fails, fd == -1 and socket is closed.
*/
static int gw_create_backend_connection(DCB *backend_dcb,
SERVER *server,
static int gw_create_backend_connection(DCB *backend_dcb,
SERVER *server,
SESSION *session)
{
MySQLProtocol *protocol = NULL;
@ -939,10 +946,10 @@ static int gw_create_backend_connection(DCB *backend_dcb,
{
/** Copy client flags to backend protocol */
protocol->client_capabilities =
((MySQLProtocol *)(backend_dcb->session->client->protocol))->client_capabilities;
((MySQLProtocol *) (backend_dcb->session->client->protocol))->client_capabilities;
/** Copy client charset to backend protocol */
protocol->charset =
((MySQLProtocol *)(backend_dcb->session->client->protocol))->charset;
((MySQLProtocol *) (backend_dcb->session->client->protocol))->charset;
}
else
{
@ -958,51 +965,50 @@ static int gw_create_backend_connection(DCB *backend_dcb,
/*< Set protocol state */
switch (rv)
{
case 0:
ss_dassert(fd > 0);
protocol->fd = fd;
protocol->protocol_auth_state = MYSQL_CONNECTED;
MXS_DEBUG("%lu [gw_create_backend_connection] Established "
"connection to %s:%i, protocol fd %d client "
"fd %d.",
pthread_self(),
server->name,
server->port,
protocol->fd,
session->client->fd);
break;
case 0:
ss_dassert(fd > 0);
protocol->fd = fd;
protocol->protocol_auth_state = MYSQL_CONNECTED;
MXS_DEBUG("%lu [gw_create_backend_connection] Established "
"connection to %s:%i, protocol fd %d client "
"fd %d.",
pthread_self(),
server->name,
server->port,
protocol->fd,
session->client->fd);
break;
case 1:
ss_dassert(fd > 0);
protocol->protocol_auth_state = MYSQL_PENDING_CONNECT;
protocol->fd = fd;
MXS_DEBUG("%lu [gw_create_backend_connection] Connection "
"pending to %s:%i, protocol fd %d client fd %d.",
pthread_self(),
server->name,
server->port,
protocol->fd,
session->client->fd);
break;
case 1:
ss_dassert(fd > 0);
protocol->protocol_auth_state = MYSQL_PENDING_CONNECT;
protocol->fd = fd;
MXS_DEBUG("%lu [gw_create_backend_connection] Connection "
"pending to %s:%i, protocol fd %d client fd %d.",
pthread_self(),
server->name,
server->port,
protocol->fd,
session->client->fd);
break;
default:
ss_dassert(fd == -1);
ss_dassert(protocol->protocol_auth_state == MYSQL_ALLOC);
MXS_DEBUG("%lu [gw_create_backend_connection] Connection "
"failed to %s:%i, protocol fd %d client fd %d.",
pthread_self(),
server->name,
server->port,
protocol->fd,
session->client->fd);
break;
default:
ss_dassert(fd == -1);
ss_dassert(protocol->protocol_auth_state == MYSQL_ALLOC);
MXS_DEBUG("%lu [gw_create_backend_connection] Connection "
"failed to %s:%i, protocol fd %d client fd %d.",
pthread_self(),
server->name,
server->port,
protocol->fd,
session->client->fd);
break;
} /*< switch */
return_fd:
return fd;
}
/**
* Error event handler.
* Create error message, pass it to router's error handler and if error
@ -1068,7 +1074,7 @@ static int gw_backend_hangup(DCB *dcb)
int error, len;
len = sizeof(error);
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *)&len) == 0)
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *) & len) == 0)
{
if (error != 0 && ses_state != SESSION_STATE_STOPPING)
{
@ -1238,13 +1244,12 @@ static int backend_write_delayqueue(DCB *dcb)
if (MYSQL_IS_CHANGE_USER(((uint8_t *)GWBUF_DATA(localq))))
{
MYSQL_session* mses;
GWBUF* new_packet;
MYSQL_session mses;
GWBUF* new_packet;
mses = (MYSQL_session *)dcb->session->client->data;
new_packet = gw_create_change_user_packet(mses,
(MySQLProtocol *)dcb->protocol);
/**
gw_get_shared_session_auth_info(dcb, &mses);
new_packet = gw_create_change_user_packet(&mses, dcb->protocol);
/**
* Remove previous packet which lacks scramble
* and append the new.
*/
@ -1315,10 +1320,10 @@ static int gw_change_user(DCB *backend,
MYSQL_session *current_session = NULL;
MySQLProtocol *backend_protocol = NULL;
MySQLProtocol *client_protocol = NULL;
char username[MYSQL_USER_MAXLEN+1]="";
char database[MYSQL_DATABASE_MAXLEN+1]="";
char current_database[MYSQL_DATABASE_MAXLEN+1]="";
uint8_t client_sha1[MYSQL_SCRAMBLE_LEN]="";
char username[MYSQL_USER_MAXLEN + 1] = "";
char database[MYSQL_DATABASE_MAXLEN + 1] = "";
char current_database[MYSQL_DATABASE_MAXLEN + 1] = "";
uint8_t client_sha1[MYSQL_SCRAMBLE_LEN] = "";
uint8_t *client_auth_packet = GWBUF_DATA(queue);
unsigned int auth_token_len = 0;
uint8_t *auth_token = NULL;
@ -1331,7 +1336,7 @@ static int gw_change_user(DCB *backend,
/* now get the user, after 4 bytes header and 1 byte command */
client_auth_packet += 5;
strncpy(username, (char *)client_auth_packet,MYSQL_USER_MAXLEN);
strncpy(username, (char *)client_auth_packet, MYSQL_USER_MAXLEN);
client_auth_packet += strlen(username) + 1;
/* get the auth token len */
@ -1354,7 +1359,7 @@ static int gw_change_user(DCB *backend,
}
/* get new database name */
strncpy(database, (char *)client_auth_packet,MYSQL_DATABASE_MAXLEN);
strncpy(database, (char *)client_auth_packet, MYSQL_DATABASE_MAXLEN);
/* get character set */
if (strlen(database))
@ -1371,45 +1376,48 @@ static int gw_change_user(DCB *backend,
memcpy(&backend_protocol->charset, client_auth_packet, sizeof(int));
}
spinlock_acquire(&in_session->ses_lock);
/* save current_database name */
strncpy(current_database, current_session->db,MYSQL_DATABASE_MAXLEN);
strncpy(current_database, current_session->db, MYSQL_DATABASE_MAXLEN);
/*
* Now clear database name in dcb as we don't do local authentication on db name for change user.
* Local authentication only for user@host and if successful the database name change is sent to backend.
*/
strcpy(current_session->db, "");
strncpy(current_session->db, "", MYSQL_DATABASE_MAXLEN);
/*
* decode the token and check the password.
* Decode the token and check the password.
* Note: if auth_token_len == 0 && auth_token == NULL, user is without password
*/
auth_ret = gw_check_mysql_scramble_data(backend->session->client,
auth_token,
auth_token_len,
auth_token, auth_token_len,
client_protocol->scramble,
sizeof(client_protocol->scramble),
username,
client_sha1);
username, client_sha1);
strncpy(current_session->db, current_database, MYSQL_DATABASE_MAXLEN);
spinlock_release(&in_session->ses_lock);
if (auth_ret != 0)
{
if (!service_refresh_users(backend->session->client->service))
if (service_refresh_users(backend->session->client->service) == 0)
{
/* Try authentication again with new repository data */
/* Note: if no auth client authentication will fail */
auth_ret = gw_check_mysql_scramble_data(backend->session->client,
spinlock_acquire(&in_session->ses_lock);
strncpy(current_session->db, "", MYSQL_DATABASE_MAXLEN);
auth_ret = gw_check_mysql_scramble_data(
backend->session->client,
auth_token, auth_token_len,
client_protocol->scramble,
sizeof(client_protocol->scramble),
username,
client_sha1);
username, client_sha1);
strncpy(current_session->db, current_database, MYSQL_DATABASE_MAXLEN);
spinlock_release(&in_session->ses_lock);
}
}
/* copy back current datbase to client session */
strcpy(current_session->db, current_database);
/* let's free the auth_token now */
if (auth_token)
{
@ -1457,7 +1465,9 @@ static int gw_change_user(DCB *backend,
MYSQL_COM_CHANGE_USER);
modutil_reply_auth_error(backend, message, 0);
rv = 1;
} else {
}
else
{
rv = gw_send_change_user_to_backend(database, username, client_sha1, backend_protocol);
/*
* Now copy new data into user session
@ -1473,7 +1483,6 @@ retblock:
return rv;
}
/**
* Move packets or parts of packets from readbuf to outbuf as the packet headers
* and lengths have been noticed and counted.
@ -1486,12 +1495,12 @@ retblock:
*
* @return GWBUF which includes complete MySQL packet
*/
static GWBUF* process_response_data(DCB* dcb,
static GWBUF* process_response_data(DCB* dcb,
GWBUF* readbuf,
int nbytes_to_process)
int nbytes_to_process)
{
int npackets_left = 0; /*< response's packet count */
ssize_t nbytes_left = 0; /*< nbytes to be read for the packet */
ssize_t nbytes_left = 0; /*< nbytes to be read for the packet */
MySQLProtocol* p;
GWBUF* outbuf = NULL;
int initial_packets = npackets_left;
@ -1560,7 +1569,7 @@ static GWBUF* process_response_data(DCB* dcb,
}
nbytes_to_process = 0;
}
/** Packet was read. All bytes belonged to the last packet. */
/** Packet was read. All bytes belonged to the last packet. */
else if (nbytes_left == nbytes_to_process)
{
nbytes_left = 0;
@ -1570,12 +1579,12 @@ static GWBUF* process_response_data(DCB* dcb,
outbuf = gwbuf_append(outbuf, readbuf);
readbuf = NULL;
}
/**
* Packet was read. There should be more since bytes were
* left over.
* Move the next packet to its own buffer and add that next
* to the prev packet's buffer.
*/
/**
* Packet was read. There should be more since bytes were
* left over.
* Move the next packet to its own buffer and add that next
* to the prev packet's buffer.
*/
else /*< nbytes_left < nbytes_to_process */
{
ss_dassert(nbytes_left >= 0);
@ -1583,8 +1592,8 @@ static GWBUF* process_response_data(DCB* dcb,
/** Move the prefix of the buffer to outbuf from redbuf */
outbuf = gwbuf_append(outbuf,
gwbuf_clone_portion(readbuf, 0, (size_t)nbytes_left));
readbuf = gwbuf_consume(readbuf, (size_t)nbytes_left);
gwbuf_clone_portion(readbuf, 0, (size_t) nbytes_left));
readbuf = gwbuf_consume(readbuf, (size_t) nbytes_left);
ss_dassert(npackets_left > 0);
npackets_left -= 1;
nbytes_left = 0;
@ -1611,7 +1620,7 @@ static GWBUF* process_response_data(DCB* dcb,
/** Archive the command */
protocol_archive_srv_command(p);
}
/** Read next packet */
/** Read next packet */
else
{
uint8_t* data;
@ -1624,19 +1633,19 @@ static GWBUF* process_response_data(DCB* dcb,
{
MXS_DEBUG("%lu [%s] Read %d packets. Waiting for %d more "
"packets for a total of %d packets.",
pthread_self(),__FUNCTION__,
pthread_self(), __FUNCTION__,
initial_packets - npackets_left,
npackets_left,initial_packets);
npackets_left, initial_packets);
/** Store the already read data into the readqueue of the DCB
* and restore the response status to the initial number of packets */
dcb->dcb_readqueue = gwbuf_append(outbuf,dcb->dcb_readqueue);
dcb->dcb_readqueue = gwbuf_append(outbuf, dcb->dcb_readqueue);
protocol_set_response_status(p, initial_packets, initial_bytes);
return NULL;
}
data = GWBUF_DATA(readbuf);
nbytes_left = MYSQL_GET_PACKET_LEN(data)+MYSQL_HEADER_LEN;
nbytes_left = MYSQL_GET_PACKET_LEN(data) + MYSQL_HEADER_LEN;
/** Store new status to protocol structure */
protocol_set_response_status(p, npackets_left, nbytes_left);
}
@ -1645,12 +1654,11 @@ static GWBUF* process_response_data(DCB* dcb,
return outbuf;
}
static bool sescmd_response_complete(DCB* dcb)
{
int npackets_left;
ssize_t nbytes_left;
MySQLProtocol* p;
MySQLProtocol* p;
bool succp;
p = DCB_PROTOCOL(dcb, MySQLProtocol);

View File

@ -1566,8 +1566,6 @@ void check_drop_tmp_table(
char** tbl = NULL;
char *hkey,*dbname;
MYSQL_session* data;
DCB* master_dcb = NULL;
rses_property_t* rses_prop_tmp;
if(router_cli_ses == NULL || querybuf == NULL)
@ -1577,27 +1575,14 @@ void check_drop_tmp_table(
return;
}
if(router_cli_ses->rses_master_ref == NULL)
if(router_cli_ses->client_dcb == NULL)
{
MXS_ERROR("[%s] Error: Master server reference is NULL.",
__FUNCTION__);
MXS_ERROR("[%s] Error: Client DCB is NULL.", __FUNCTION__);
return;
}
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
if(master_dcb == NULL || master_dcb->session == NULL)
{
MXS_ERROR("[%s] Error: Master server DBC is NULL. "
"This means that the connection to the master server is already "
"closed while a query is still being routed.",__FUNCTION__);
return;
}
CHK_DCB(master_dcb);
data = (MYSQL_session*)master_dcb->session->data;
data = (MYSQL_session*)router_cli_ses->client_dcb->session->data;
if(data == NULL)
{
@ -1668,32 +1653,20 @@ static qc_query_type_t is_read_tmp_table(
return type;
}
if(router_cli_ses->rses_master_ref == NULL)
if(router_cli_ses->client_dcb == NULL)
{
MXS_ERROR("[%s] Error: Master server reference is NULL.",
__FUNCTION__);
MXS_ERROR("[%s] Error: Client DCB is NULL.", __FUNCTION__);
return type;
}
if (BREF_IS_IN_USE(router_cli_ses->rses_master_ref))
{
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
if(master_dcb == NULL || master_dcb->session == NULL)
{
MXS_ERROR("[%s] Error: Master server DBC is NULL. "
"This means that the connection to the master server is already "
"closed while a query is still being routed.",__FUNCTION__);
return qtype;
}
CHK_DCB(master_dcb);
data = (MYSQL_session*)master_dcb->session->data;
data = (MYSQL_session*)router_cli_ses->client_dcb->session->data;
if(data == NULL)
{
MXS_ERROR("[%s] Error: User data in master server DBC is NULL.",__FUNCTION__);
MXS_ERROR("[%s] Error: User data in client DBC is NULL.",__FUNCTION__);
return qtype;
}
@ -1765,7 +1738,6 @@ static void check_create_tmp_table(
int klen = 0;
char *hkey,*dbname;
MYSQL_session* data;
DCB* master_dcb = NULL;
rses_property_t* rses_prop_tmp;
HASHTABLE* h;
@ -1776,28 +1748,15 @@ static void check_create_tmp_table(
return;
}
if(router_cli_ses->rses_master_ref == NULL)
if(router_cli_ses->client_dcb == NULL)
{
MXS_ERROR("[%s] Error: Master server reference is NULL.",
__FUNCTION__);
MXS_ERROR("[%s] Error: Client DCB is NULL.", __FUNCTION__);
return;
}
router_cli_ses->have_tmp_tables = true;
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
if(master_dcb == NULL || master_dcb->session == NULL)
{
MXS_ERROR("[%s] Error: Master server DCB is NULL. "
"This means that the connection to the master server is already "
"closed while a query is still being routed.",__FUNCTION__);
return;
}
CHK_DCB(master_dcb);
data = (MYSQL_session*)master_dcb->session->data;
data = (MYSQL_session*)router_cli_ses->client_dcb->session->data;
if(data == NULL)
{
@ -2097,10 +2056,10 @@ static bool route_single_stmt(
* Read stored master DCB pointer. If master is not set, routing must
* be aborted
*/
if ((master_dcb = rses->rses_master_ref->bref_dcb) == NULL)
if ((master_dcb = rses->rses_master_ref->bref_dcb) == NULL ||
BREF_IS_CLOSED(rses->rses_master_ref))
{
char* query_str = modutil_get_query(querybuf);
CHK_DCB(master_dcb);
MXS_ERROR("Can't route %s:%s:\"%s\" to "
"backend server. Session doesn't have a Master "
"node",
@ -2112,6 +2071,7 @@ static bool route_single_stmt(
goto retblock;
}
CHK_DCB(master_dcb);
packet = GWBUF_DATA(querybuf);
packet_len = gw_mysql_get_byte3(packet);
@ -2491,7 +2451,6 @@ static bool route_single_stmt(
#if defined(SS_EXTRA_DEBUG)
MXS_INFO("Found DCB for slave.");
#endif
atomic_add(&inst->stats.n_slave, 1);
}
else
@ -2847,21 +2806,17 @@ static void clientReply (
uint8_t* replybuf = (uint8_t *)GWBUF_DATA(writebuf);
size_t len = MYSQL_GET_PACKET_LEN(buf);
size_t replylen = MYSQL_GET_PACKET_LEN(replybuf);
char* cmdstr = strndup(&((char *)buf)[5], len-4);
char* err = strndup(&((char *)replybuf)[8], 5);
char* replystr = strndup(&((char *)replybuf)[13],
replylen-4-5);
ss_dassert(len+4 == GWBUF_LENGTH(scur->scmd_cur_cmd->my_sescmd_buf));
MXS_ERROR("Failed to execute %s in %s:%d. %s %s",
cmdstr,
MXS_ERROR("Failed to execute session command in %s:%d. Error was: %s %s",
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port,
err,
replystr);
free(cmdstr);
free(err);
free(replystr);
}
@ -2893,11 +2848,9 @@ static void clientReply (
* This applies to session commands only. Counter decrement
* for other type of queries is done outside this block.
*/
if (writebuf != NULL && client_dcb != NULL)
{
/** Set response status as replied */
bref_clear_state(bref, BREF_WAITING_RESULT);
}
/** Set response status as replied */
bref_clear_state(bref, BREF_WAITING_RESULT);
}
/**
* Clear BREF_QUERY_ACTIVE flag and decrease waiter counter.
@ -3815,8 +3768,8 @@ static GWBUF* sescmd_cursor_process_replies(
if(bref->reply_cmd != scmd->reply_cmd)
{
MXS_INFO("Backend server '%s' response differs from master's response. "
"Closing connection.",
MXS_ERROR("Slave server '%s': response differs from master's response. "
"Closing connection due to inconsistent session state.",
bref->bref_backend->backend_server->unique_name);
sescmd_cursor_set_active(scur,false);
bref_clear_state(bref,BREF_QUERY_ACTIVE);

View File

@ -1767,7 +1767,7 @@ bool send_database_list(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client)
{
bool rval = false;
spinlock_acquire(&client->shardmap->lock);
if (client->shardmap->state == SHMAP_READY)
if (client->shardmap->state != SHMAP_UNINIT)
{
struct string_array strarray;
const int size = hashtable_size(client->shardmap->hash);
@ -1849,12 +1849,7 @@ static int routeQuery(ROUTER* instance,
char errbuf[26+MYSQL_DATABASE_MAXLEN];
CHK_CLIENT_RSES(router_cli_ses);
/** Dirty read for quick check if router is closed. */
if (router_cli_ses->rses_closed)
{
rses_is_closed = true;
}
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
if (!rses_begin_locked_router_action(router_cli_ses))
{
@ -4313,7 +4308,7 @@ int process_show_shards(ROUTER_CLIENT_SES* rses)
int rval = 0;
spinlock_acquire(&rses->shardmap->lock);
if (rses->shardmap->state == SHMAP_READY)
if(rses->shardmap->state != SHMAP_UNINIT)
{
HASHITERATOR* iter = hashtable_iterator(rses->shardmap->hash);
struct shard_list sl;
@ -4380,7 +4375,7 @@ bool handle_default_db(ROUTER_CLIENT_SES *router_cli_ses)
char* target = NULL;
spinlock_acquire(&router_cli_ses->shardmap->lock);
if (router_cli_ses->shardmap->state == SHMAP_READY)
if(router_cli_ses->shardmap->state != SHMAP_UNINIT)
{
target = hashtable_fetch(router_cli_ses->shardmap->hash, router_cli_ses->connect_db);
}