Merge branch 'develop' into firewall

This commit is contained in:
Markus Makela
2014-11-17 13:05:40 +02:00
18 changed files with 583 additions and 312 deletions

View File

@ -297,6 +297,7 @@ typedef struct {
#define MYSQL_GET_STMTOK_NATTR(payload) (gw_mysql_get_byte2(&payload[11]))
#define MYSQL_IS_ERROR_PACKET(payload) (MYSQL_GET_COMMAND(payload)==0xff)
#define MYSQL_IS_COM_QUIT(payload) (MYSQL_GET_COMMAND(payload)==0x01)
#define MYSQL_IS_CHANGE_USER(payload) (MYSQL_GET_COMMAND(payload)==0x11)
#define MYSQL_GET_NATTR(payload) ((int)payload[4])
#endif /** _MYSQL_PROTOCOL_H */
@ -314,6 +315,7 @@ int gw_send_authentication_to_backend(
char *user,
uint8_t *passwd,
MySQLProtocol *protocol);
const char *gw_mysql_protocol_state2string(int state);
int gw_do_connect_to_backend(char *host, int port, int* fd);
int mysql_send_com_quit(DCB* dcb, int packet_number, GWBUF* buf);
@ -335,6 +337,11 @@ int gw_send_change_user_to_backend(
char *user,
uint8_t *passwd,
MySQLProtocol *protocol);
GWBUF* gw_create_change_user_packet(
MYSQL_session* mses,
MySQLProtocol* protocol);
int gw_find_mysql_user_password_sha1(
char *username,
uint8_t *gateway_password,

View File

@ -49,6 +49,8 @@ MODULE_INFO info = {
"An experimental HTTPD implementation for use in admnistration"
};
extern int lm_enabled_logfiles_bitmask;
#define ISspace(x) isspace((int)(x))
#define HTTP_SERVER_STRING "MaxScale(c) v.1.0.0"
static char *version_str = "V1.0.1";
@ -413,9 +415,7 @@ int syseno = 0;
rc = listen(listener->fd, SOMAXCONN);
if (rc == 0) {
fprintf(stderr,
"Listening http connections at %s\n",
config);
LOGIF(LM, (skygw_log_write_flush(LOGFILE_MESSAGE,"Listening httpd connections at %s", config)));
} else {
int eno = errno;
errno = 0;

View File

@ -539,10 +539,13 @@ static int gw_read_backend_event(DCB *dcb) {
goto return_rc;
}
}
/*<
* If dcb->session->client is freed already it may be NULL.
/**
* Check that session is operable, and that client DCB is
* still listening the socket for replies.
*/
if (dcb->session->client != NULL)
if (dcb->session->state == SESSION_STATE_ROUTER_READY &&
dcb->session->client != NULL &&
dcb->session->client->state == DCB_STATE_POLLING)
{
client_protocol = SESSION_PROTOCOL(dcb->session,
MySQLProtocol);
@ -571,6 +574,10 @@ static int gw_read_backend_event(DCB *dcb) {
rc = 1;
}
}
else /*< session is closing; replying to client isn't possible */
{
gwbuf_free(read_buffer);
}
}
return_rc:
@ -1082,6 +1089,10 @@ gw_backend_close(DCB *dcb)
session = dcb->session;
CHK_SESSION(session);
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
"%lu [gw_backend_close]",
pthread_self())));
quitbuf = mysql_create_com_quit(NULL, 0);
gwbuf_set_type(quitbuf, GWBUF_TYPE_MYSQL);
@ -1157,7 +1168,24 @@ static int backend_write_delayqueue(DCB *dcb)
localq = dcb->delayq;
dcb->delayq = NULL;
spinlock_release(&dcb->delayqlock);
rc = dcb_write(dcb, localq);
if (MYSQL_IS_CHANGE_USER(((uint8_t *)GWBUF_DATA(localq))))
{
MYSQL_session* mses;
GWBUF* new_packet;
mses = (MYSQL_session *)dcb->session->client->data;
new_packet = gw_create_change_user_packet(
mses,
(MySQLProtocol *)dcb->protocol);
/**
* Remove previous packet which lacks scramble
* and append the new.
*/
localq = gwbuf_consume(localq, GWBUF_LENGTH(localq));
localq = gwbuf_append(localq, new_packet);
}
rc = dcb_write(dcb, localq);
}
if (rc == 0)
@ -1289,13 +1317,25 @@ static int gw_change_user(
* decode the token and check the password.
* Note: if auth_token_len == 0 && auth_token == NULL, user is without password
*/
auth_ret = gw_check_mysql_scramble_data(backend->session->client, auth_token, auth_token_len, client_protocol->scramble, sizeof(client_protocol->scramble), username, client_sha1);
auth_ret = gw_check_mysql_scramble_data(backend->session->client,
auth_token,
auth_token_len,
client_protocol->scramble,
sizeof(client_protocol->scramble),
username,
client_sha1);
if (auth_ret != 0) {
if (!service_refresh_users(backend->session->client->service)) {
/* Try authentication again with new repository data */
/* Note: if no auth client authentication will fail */
auth_ret = gw_check_mysql_scramble_data(backend->session->client, auth_token, auth_token_len, client_protocol->scramble, sizeof(client_protocol->scramble), username, client_sha1);
auth_ret = gw_check_mysql_scramble_data(
backend->session->client,
auth_token, auth_token_len,
client_protocol->scramble,
sizeof(client_protocol->scramble),
username,
client_sha1);
}
}
@ -1359,7 +1399,7 @@ static int gw_change_user(
rv = gw_send_change_user_to_backend(database, username, client_sha1, backend_protocol);
/*
* Now copy new data into user session
*/
*/
strcpy(current_session->user, username);
strcpy(current_session->db, database);
memcpy(current_session->client_sha1, client_sha1, sizeof(current_session->client_sha1));

View File

@ -479,7 +479,8 @@ static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue) {
auth_ret = gw_check_mysql_scramble_data(dcb,
auth_token,
auth_token_len,
protocol->scramble, sizeof(protocol->scramble),
protocol->scramble,
sizeof(protocol->scramble),
username,
stage1_hash);
@ -491,7 +492,14 @@ static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue) {
if (!service_refresh_users(dcb->service)) {
/* Try authentication again with new repository data */
/* Note: if no auth client authentication will fail */
auth_ret = gw_check_mysql_scramble_data(dcb, auth_token, auth_token_len, protocol->scramble, sizeof(protocol->scramble), username, stage1_hash);
auth_ret = gw_check_mysql_scramble_data(
dcb,
auth_token,
auth_token_len,
protocol->scramble,
sizeof(protocol->scramble),
username,
stage1_hash);
}
}
@ -1047,9 +1055,7 @@ int gw_MySQLListener(
rc = listen(l_so, 10 * SOMAXCONN);
if (rc == 0) {
fprintf(stderr,
"Listening MySQL connections at %s\n",
config_bind);
LOGIF(LM, (skygw_log_write_flush(LOGFILE_MESSAGE,"Listening MySQL connections at %s", config_bind)));
} else {
int eno = errno;
errno = 0;
@ -1386,6 +1392,10 @@ gw_client_close(DCB *dcb)
CHK_PROTOCOL(protocol);
}
#endif
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
"%lu [gw_client_close]",
pthread_self())));
mysql_protocol_done(dcb);
session = dcb->session;

View File

@ -275,8 +275,7 @@ int gw_read_backend_handshake(
payload += 4;
//Now decode mysql handshake
success = gw_decode_mysql_server_handshake(conn,
payload);
success = gw_decode_mysql_server_handshake(conn, payload);
if (success < 0) {
/* MySQL handshake has not been properly decoded
@ -1054,197 +1053,226 @@ int mysql_send_custom_error (
return dcb->func.write(dcb, buf);
}
/**
* Create COM_CHANGE_USER packet and store it to GWBUF
*
* @param mses MySQL session
* @param protocol protocol structure of the backend
*
* @return GWBUF buffer consisting of COM_CHANGE_USER packet
*
* @note the function doesn't fail
*/
GWBUF* gw_create_change_user_packet(
MYSQL_session* mses,
MySQLProtocol* protocol)
{
char* db;
char* user;
uint8_t* pwd;
GWBUF* buffer;
int compress = 0;
uint8_t* payload = NULL;
uint8_t* payload_start = NULL;
long bytes;
uint8_t client_scramble[GW_MYSQL_SCRAMBLE_SIZE];
uint32_t server_capabilities = 0;
uint32_t final_capabilities = 0;
char dbpass[MYSQL_USER_MAXLEN + 1]="";
char* curr_db = NULL;
uint8_t* curr_passwd = NULL;
unsigned int charset;
db = mses->db;
user = mses->user;
pwd = mses->client_sha1;
if (strlen(db) > 0)
{
curr_db = db;
}
if (strlen((char *)pwd) > 0)
{
curr_passwd = pwd;
}
final_capabilities = gw_mysql_get_byte4((uint8_t *)&server_capabilities);
/** Copy client's flags to backend */
final_capabilities |= protocol->client_capabilities;
/* get charset the client sent and use it for connection auth */
charset = protocol->charset;
if (compress)
{
final_capabilities |= GW_MYSQL_CAPABILITIES_COMPRESS;
#ifdef DEBUG_MYSQL_CONN
fprintf(stderr, ">>>> Backend Connection with compression\n");
#endif
}
if (curr_passwd != NULL)
{
uint8_t hash1[GW_MYSQL_SCRAMBLE_SIZE]="";
uint8_t hash2[GW_MYSQL_SCRAMBLE_SIZE]="";
uint8_t new_sha[GW_MYSQL_SCRAMBLE_SIZE]="";
/** hash1 is the function input, SHA1(real_password) */
memcpy(hash1, pwd, GW_MYSQL_SCRAMBLE_SIZE);
/**
* hash2 is the SHA1(input data), where
* input_data = SHA1(real_password)
*/
gw_sha1_str(hash1, GW_MYSQL_SCRAMBLE_SIZE, hash2);
/** dbpass is the HEX form of SHA1(SHA1(real_password)) */
gw_bin2hex(dbpass, hash2, GW_MYSQL_SCRAMBLE_SIZE);
/** new_sha is the SHA1(CONCAT(scramble, hash2) */
gw_sha1_2_str(protocol->scramble,
GW_MYSQL_SCRAMBLE_SIZE,
hash2,
GW_MYSQL_SCRAMBLE_SIZE,
new_sha);
/** compute the xor in client_scramble */
gw_str_xor(client_scramble,
new_sha, hash1,
GW_MYSQL_SCRAMBLE_SIZE);
}
if (curr_db == NULL)
{
final_capabilities &= ~GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB;
}
else
{
final_capabilities |= GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB;
}
final_capabilities |= GW_MYSQL_CAPABILITIES_PLUGIN_AUTH;
/**
* Protocol MySQL COM_CHANGE_USER for CLIENT_PROTOCOL_41
* 1 byte COMMAND
*/
bytes = 1;
/** add the user and a terminating char */
bytes += strlen(user);
bytes++;
/**
* next will be + 1 (scramble_len) + 20 (fixed_scramble) +
* (db + NULL term) + 2 bytes charset
*/
if (curr_passwd != NULL)
{
bytes += GW_MYSQL_SCRAMBLE_SIZE;
}
/** 1 byte for scramble_len */
bytes++;
/** db name and terminating char */
if (curr_db != NULL)
{
bytes += strlen(curr_db);
}
bytes++;
/** the charset */
bytes += 2;
bytes += strlen("mysql_native_password");
bytes++;
/** the packet header */
bytes += 4;
buffer = gwbuf_alloc(bytes);
/**
* Set correct type to GWBUF so that it will be handled like session
* commands
*/
buffer->gwbuf_type =
GWBUF_TYPE_MYSQL|GWBUF_TYPE_SINGLE_STMT|GWBUF_TYPE_SESCMD;
payload = GWBUF_DATA(buffer);
memset(payload, '\0', bytes);
payload_start = payload;
/** set packet number to 0 */
payload[3] = 0x00;
payload += 4;
/** set the command COM_CHANGE_USER 0x11 */
payload[0] = 0x11;
payload++;
memcpy(payload, user, strlen(user));
payload += strlen(user);
payload++;
if (curr_passwd != NULL)
{
/** set the auth-length */
*payload = GW_MYSQL_SCRAMBLE_SIZE;
payload++;
/**
* copy the 20 bytes scramble data after
* packet_buffer+36+user+NULL+1 (byte of auth-length)
*/
memcpy(payload, client_scramble, GW_MYSQL_SCRAMBLE_SIZE);
payload += GW_MYSQL_SCRAMBLE_SIZE;
}
else
{
/** skip the auth-length and write a NULL */
payload++;
}
/** if the db is not NULL append it */
if (curr_db != NULL)
{
memcpy(payload, curr_db, strlen(curr_db));
payload += strlen(curr_db);
}
payload++;
/** set the charset, 2 bytes */
*payload = charset;
payload++;
*payload = '\x00';
payload++;
memcpy(payload, "mysql_native_password", strlen("mysql_native_password"));
payload += strlen("mysql_native_password");
payload++;
/** put here the paylod size: bytes to write - 4 bytes packet header */
gw_mysql_set_byte3(payload_start, (bytes-4));
return buffer;
}
/**
* Write a MySQL CHANGE_USER packet to backend server
*
* @param conn MySQL protocol structure
* @param dbname The selected database
* @param user The selected user
* @param passwd The SHA1(real_password): Note real_password is unknown
* @param passwd The SHA1(real_password)
* @return 1 on success, 0 on failure
*/
int gw_send_change_user_to_backend(
char *dbname,
char *user,
uint8_t *passwd,
MySQLProtocol *conn)
char *dbname,
char *user,
uint8_t *passwd,
MySQLProtocol *conn)
{
int compress = 0;
int rv;
uint8_t *payload = NULL;
uint8_t *payload_start = NULL;
long bytes;
uint8_t client_scramble[GW_MYSQL_SCRAMBLE_SIZE];
uint8_t client_capabilities[4];
uint32_t server_capabilities = 0;
uint32_t final_capabilities = 0;
char dbpass[MYSQL_USER_MAXLEN + 1]="";
GWBUF *buffer;
DCB *dcb;
char *curr_db = NULL;
uint8_t *curr_passwd = NULL;
unsigned int charset;
if (strlen(dbname))
curr_db = dbname;
if (strlen((char *)passwd))
curr_passwd = passwd;
dcb = conn->owner_dcb;
final_capabilities = gw_mysql_get_byte4((uint8_t *)&server_capabilities);
/** Copy client's flags to backend */
final_capabilities |= conn->client_capabilities;
/* get charset the client sent and use it for connection auth */
charset = conn->charset;
if (compress) {
final_capabilities |= GW_MYSQL_CAPABILITIES_COMPRESS;
#ifdef DEBUG_MYSQL_CONN
fprintf(stderr, ">>>> Backend Connection with compression\n");
#endif
}
if (curr_passwd != NULL) {
uint8_t hash1[GW_MYSQL_SCRAMBLE_SIZE]="";
uint8_t hash2[GW_MYSQL_SCRAMBLE_SIZE]="";
uint8_t new_sha[GW_MYSQL_SCRAMBLE_SIZE]="";
// hash1 is the function input, SHA1(real_password)
memcpy(hash1, passwd, GW_MYSQL_SCRAMBLE_SIZE);
// hash2 is the SHA1(input data), where input_data = SHA1(real_password)
gw_sha1_str(hash1, GW_MYSQL_SCRAMBLE_SIZE, hash2);
// dbpass is the HEX form of SHA1(SHA1(real_password))
gw_bin2hex(dbpass, hash2, GW_MYSQL_SCRAMBLE_SIZE);
// new_sha is the SHA1(CONCAT(scramble, hash2)
gw_sha1_2_str(conn->scramble, GW_MYSQL_SCRAMBLE_SIZE, hash2, GW_MYSQL_SCRAMBLE_SIZE, new_sha);
// compute the xor in client_scramble
gw_str_xor(client_scramble, new_sha, hash1, GW_MYSQL_SCRAMBLE_SIZE);
}
if (curr_db == NULL) {
// without db
final_capabilities &= ~GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB;
} else {
final_capabilities |= GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB;
}
final_capabilities |= GW_MYSQL_CAPABILITIES_PLUGIN_AUTH;
gw_mysql_set_byte4(client_capabilities, final_capabilities);
// Protocol MySQL COM_CHANGE_USER for CLIENT_PROTOCOL_41
// 1 byte COMMAND
bytes = 1;
// add the user
bytes += strlen(user);
// NULL byte for user string
bytes++;
// next will be + 1 (scramble_len) + 20 (fixed_scramble) + (dbname + NULL term) + 2 bytes charset
if (curr_passwd != NULL) {
bytes += GW_MYSQL_SCRAMBLE_SIZE;
}
// 1 byte for scramble_len
bytes++;
if (curr_db != NULL) {
bytes += strlen(curr_db);
}
// NULL byte for dbname string
bytes++;
// the charset
bytes += 2;
bytes += strlen("mysql_native_password");
bytes++;
// the packet header
bytes += 4;
// allocating the GWBUF
buffer = gwbuf_alloc(bytes);
/**
* Set correct type to GWBUF so that it will be handled like session
* commands should
*/
buffer->gwbuf_type = GWBUF_TYPE_MYSQL|GWBUF_TYPE_SINGLE_STMT|GWBUF_TYPE_SESCMD;
payload = GWBUF_DATA(buffer);
// clearing data
memset(payload, '\0', bytes);
GWBUF *buffer;
int rc;
MYSQL_session* mses;
// save the start pointer
payload_start = payload;
mses = (MYSQL_session*)conn->owner_dcb->session->client->data;
buffer = gw_create_change_user_packet(mses, conn);
rc = conn->owner_dcb->func.write(conn->owner_dcb, buffer);
// set packet # = 1
payload[3] = 0x00;
payload += 4;
// set the command COM_CHANGE_USER \x11
payload[0] = 0x11;
payload++;
memcpy(payload, user, strlen(user));
payload += strlen(user);
payload++;
if (curr_passwd != NULL) {
// set the auth-length
*payload = GW_MYSQL_SCRAMBLE_SIZE;
payload++;
//copy the 20 bytes scramble data after packet_buffer+36+user+NULL+1 (byte of auth-length)
memcpy(payload, client_scramble, GW_MYSQL_SCRAMBLE_SIZE);
payload += GW_MYSQL_SCRAMBLE_SIZE;
} else {
// skip the auth-length and write a NULL
payload++;
}
// if the db is not NULL append it
if (curr_db != NULL) {
memcpy(payload, curr_db, strlen(curr_db));
payload += strlen(curr_db);
payload++;
} else {
// skip the NULL
payload++;
if (rc != 0)
{
rc = 1;
}
// set the charset, 2 bytes!!!!
*payload = charset;
payload++;
*payload = '\x00';
payload++;
memcpy(payload, "mysql_native_password", strlen("mysql_native_password"));
payload += strlen("mysql_native_password");
payload++;
// put here the paylod size: bytes to write - 4 bytes packet header
gw_mysql_set_byte3(payload_start, (bytes-4));
rv = dcb->func.write(dcb, buffer);
if (rv == 0)
return 0;
else
return 1;
return rc;
}
/**
@ -1762,13 +1790,13 @@ void protocol_archive_srv_command(
}
s1 = &p->protocol_command;
#if defined(EXTRA_SS_DEBUG)
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Move command %s from fd %d to command history.",
STRPACKETTYPE(s1->scom_cmd),
p->owner_dcb->fd)));
#endif
/** Copy to history list */
if ((h1 = p->protocol_cmd_history) == NULL)
{
@ -1846,7 +1874,7 @@ void protocol_add_srv_command(
STRPACKETTYPE(cmd),
p->owner_dcb->fd)));
#if defined(SS_DEBUG)
#if defined(EXTRA_SS_DEBUG)
c = &p->protocol_command;
while (c != NULL && c->scom_cmd != MYSQL_COM_UNDEFINED)

View File

@ -390,9 +390,7 @@ int syseno = 0;
rc = listen(listener->fd, SOMAXCONN);
if (rc == 0) {
fprintf(stderr,
"Listening telnet connections at %s\n",
config);
LOGIF(LM, (skygw_log_write_flush(LOGFILE_MESSAGE,"Listening telnet connections at %s", config)));
} else {
int eno = errno;
errno = 0;

View File

@ -490,6 +490,81 @@ struct subcommand removeoptions[] = {
}
};
/**
* User command to flush a single logfile
*
* @param pdcb The stream to write output to
* @param logname The name of the log
*/
static void
flushlog(DCB *pdcb, char *logname)
{
if (logname == NULL)
{
}
else if (!strcasecmp(logname, "error"))
{
skygw_log_rotate(LOGFILE_ERROR);
}
else if (!strcasecmp(logname, "message"))
{
skygw_log_rotate(LOGFILE_MESSAGE);
}
else if (!strcasecmp(logname, "trace"))
{
skygw_log_rotate(LOGFILE_TRACE);
}
else if (!strcasecmp(logname, "debug"))
{
skygw_log_rotate(LOGFILE_DEBUG);
}
else
{
dcb_printf(pdcb, "Unexpected logfile name, expected "
"error, message, trace oe debug.\n");
}
}
/**
* User command to flush all logfiles
*
* @param pdcb The stream to write output to
*/
static void
flushlogs(DCB *pdcb)
{
skygw_log_rotate(LOGFILE_ERROR);
skygw_log_rotate(LOGFILE_MESSAGE);
skygw_log_rotate(LOGFILE_TRACE);
skygw_log_rotate(LOGFILE_DEBUG);
}
/**
* The subcommands of the flush command
*/
struct subcommand flushoptions[] = {
{
"log",
1,
flushlog,
"Flush the content of a log file, close that log, rename it and open a new log file",
"Flush the content of a log file, close that log, rename it and open a new log file",
{ARG_TYPE_STRING, 0, 0}
},
{
"logs",
0,
flushlogs,
"Flush the content of all log files, close that logs, rename them and open a new log files",
"Flush the content of all log files, close that logs, rename them and open a new log files",
{0, 0, 0}
},
{
NULL, 0, NULL, NULL, NULL, {0, 0, 0}
}
};
/**
* The debug command table
@ -505,6 +580,7 @@ static struct {
#if defined(FAKE_CODE)
{ "fail", failoptions },
#endif /* FAKE_CODE */
{ "flush", flushoptions },
{ "list", listoptions },
{ "reload", reloadoptions },
{ "remove", removeoptions },

View File

@ -267,7 +267,11 @@ static bool handle_error_new_connection(
ROUTER_CLIENT_SES* rses,
DCB* backend_dcb,
GWBUF* errmsg);
static void handle_error_reply_client(SESSION* ses, GWBUF* errmsg);
static void handle_error_reply_client(
SESSION* ses,
ROUTER_CLIENT_SES* rses,
DCB* backend_dcb,
GWBUF* errmsg);
static backend_ref_t* get_root_master_bref(ROUTER_CLIENT_SES* rses);
@ -908,6 +912,10 @@ static void closeSession(
ROUTER_CLIENT_SES* router_cli_ses;
backend_ref_t* backend_ref;
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
"%lu [RWSplit:closeSession]",
pthread_self())));
/**
* router session can be NULL if newSession failed and it is discarding
* its connections and DCB's.
@ -926,18 +934,7 @@ static void closeSession(
if (!router_cli_ses->rses_closed &&
rses_begin_locked_router_action(router_cli_ses))
{
int i = 0;
/**
* session must be moved to SESSION_STATE_STOPPING state before
* router session is closed.
*/
#if defined(SS_DEBUG)
SESSION* ses = get_session_by_router_ses((void*)router_cli_ses);
ss_dassert(ses != NULL);
ss_dassert(ses->state == SESSION_STATE_STOPPING);
#endif
int i;
/**
* This sets router closed. Nobody is allowed to use router
* whithout checking this first.
@ -947,13 +944,22 @@ static void closeSession(
for (i=0; i<router_cli_ses->rses_nbackends; i++)
{
backend_ref_t* bref = &backend_ref[i];
DCB* dcb = bref->bref_dcb;
DCB* dcb = bref->bref_dcb;
/** Close those which had been connected */
if (BREF_IS_IN_USE(bref))
{
CHK_DCB(dcb);
/** Clean operation counter in bref and in SERVER */
#if defined(SS_DEBUG)
/**
* session must be moved to SESSION_STATE_STOPPING state before
* router session is closed.
*/
if (dcb->session != NULL)
{
ss_dassert(dcb->session->state == SESSION_STATE_STOPPING);
}
#endif
/** Clean operation counter in bref and in SERVER */
while (BREF_IS_WAITING_RESULT(bref))
{
bref_clear_state(bref, BREF_WAITING_RESULT);
@ -1171,38 +1177,15 @@ static bool get_dcb(
"Warning : No slaves available "
"for the service %s.",
rses->router->service->name)));
}
btype = BE_MASTER;
if (BREF_IS_IN_USE(master_bref))
{
*p_dcb = master_bref->bref_dcb;
succp = true;
ss_dassert(master_bref->bref_dcb->state != DCB_STATE_ZOMBIE);
ss_dassert(
(master_bref->bref_backend &&
(master_bref->bref_backend->backend_server ==
master_bref->bref_backend->backend_server)) &&
smallest_nconn == -1);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Using master %s:%d instead.",
master_bref->bref_backend->backend_server->name,
master_bref->bref_backend->backend_server->port)));
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : No master is availabe either. "
"Unable to find backend server for "
"routing.")));
}
}
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Warning : Using master %s:%d.",
master_bref->bref_backend->backend_server->name,
master_bref->bref_backend->backend_server->port)));
btype = BE_MASTER;
}
/** Found slave, correct the status flag */
else if (rses->router->available_slaves == false)
{
rses->router->available_slaves = true;
@ -1211,26 +1194,31 @@ static bool get_dcb(
"At least one slave has become available for "
"the service %s.",
rses->router->service->name)));
goto return_succp;
}
ss_dassert(succp);
}
if (btype == BE_MASTER)
{
for (i=0; i<rses->rses_nbackends; i++)
{
BACKEND* b = backend_ref[i].bref_backend;
if (BREF_IS_IN_USE((&backend_ref[i])) &&
(master_bref->bref_backend &&
(b->backend_server ==
master_bref->bref_backend->backend_server)))
{
*p_dcb = backend_ref[i].bref_dcb;
succp = true;
goto return_succp;
}
}
if (BREF_IS_IN_USE(master_bref) &&
SERVER_IS_MASTER(master_bref->bref_backend->backend_server))
{
*p_dcb = master_bref->bref_dcb;
succp = true;
/** if bref is in use DCB should not be closed */
ss_dassert(master_bref->bref_dcb->state != DCB_STATE_ZOMBIE);
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Server at %s:%d should be master but "
"is %s instead and can't be chosen to master.",
master_bref->bref_backend->backend_server->name,
master_bref->bref_backend->backend_server->port,
STRSRVSTATUS(master_bref->bref_backend->backend_server))));
succp = false;
}
}
return_succp:
@ -1685,9 +1673,9 @@ void check_create_tmp_table(
* for buffering the partial query, a later call to the query router will
* contain the remainder, or part thereof of the query.
*
* @param instance The query router instance
* @param session The session associated with the client
* @param queue MaxScale buffer queue with the packets received
* @param instance The query router instance
* @param router_session The session associated with the client
* @param querybuf MaxScale buffer queue with received packet
*
* @return if succeed 1, otherwise 0
* If routeQuery fails, it means that router session has failed.
@ -4116,7 +4104,7 @@ static void handleError (
switch (action) {
case ERRACT_NEW_CONNECTION:
{
{
if (!rses_begin_locked_router_action(rses))
{
*succp = false;
@ -4133,24 +4121,28 @@ static void handleError (
"Session will be closed.")));
*succp = false;
rses_end_locked_router_action(rses);
return;
}
/**
* This is called in hope of getting replacement for
* failed slave(s).
*/
*succp = handle_error_new_connection(inst,
rses,
backend_dcb,
errmsgbuf);
else
{
/**
* This is called in hope of getting replacement for
* failed slave(s).
*/
*succp = handle_error_new_connection(inst,
rses,
backend_dcb,
errmsgbuf);
}
rses_end_locked_router_action(rses);
break;
}
case ERRACT_REPLY_CLIENT:
{
handle_error_reply_client(session, errmsgbuf);
handle_error_reply_client(session,
rses,
backend_dcb,
errmsgbuf);
*succp = false; /*< no new backend servers were made available */
break;
}
@ -4163,16 +4155,29 @@ static void handleError (
static void handle_error_reply_client(
SESSION* ses,
GWBUF* errmsg)
SESSION* ses,
ROUTER_CLIENT_SES* rses,
DCB* backend_dcb,
GWBUF* errmsg)
{
session_state_t sesstate;
DCB* client_dcb;
backend_ref_t* bref;
spinlock_acquire(&ses->ses_lock);
sesstate = ses->state;
client_dcb = ses->client;
spinlock_release(&ses->ses_lock);
/**
* If bref exists, mark it closed
*/
if ((bref = get_bref_from_dcb(rses, backend_dcb)) != NULL)
{
CHK_BACKEND_REF(bref);
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED);
}
if (sesstate == SESSION_STATE_ROUTER_READY)
{
@ -4696,7 +4701,7 @@ static backend_ref_t* get_root_master_bref(
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Could not find master among the backend "
"servers. Previous master state : %s",
"servers. Previous master's state : %s",
STRSRVSTATUS(BREFSRV(rses->rses_master_ref)))));
}
return candidate_bref;

View File

@ -1,7 +1,7 @@
add_test(NAME ReadWriteSplitTest COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/rwsplit.sh testrwsplit.log ${TEST_HOST} ${TEST_PORT_RW} ${TEST_MASTER_ID} ${TEST_USER} ${TEST_PASSWORD} ${CMAKE_CURRENT_SOURCE_DIR})
if(MYSQLCLIENT_FOUND)
add_test(NAME ReadWriteSplitLoginTest COMMAND $<TARGET_FILE:testconnect> 10000 ${TEST_HOST} ${MASTER_PORT} ${TEST_HOST} ${TEST_PORT_RW} 1.10)
add_test(NAME ReadWriteSplitAuthTest COMMAND $<TARGET_FILE:testconnect> 10000 ${TEST_HOST} ${MASTER_PORT} ${TEST_HOST} ${TEST_PORT_RW} 1.10)
endif()
add_subdirectory(test_hints)

View File

@ -2,5 +2,5 @@ if(MYSQLCLIENT_FOUND)
add_executable(testconnect testconnect.c)
message(STATUS "Linking against: ${MYSQLCLIENT_LIBRARIES}")
target_link_libraries(testconnect ${MYSQLCLIENT_LIBRARIES} ssl crypto dl z m rt pthread)
add_test(NAME ReadConnRouterLoginTest COMMAND $<TARGET_FILE:testconnect> 10000 ${TEST_HOST} ${MASTER_PORT} ${TEST_HOST} ${TEST_PORT} 1.10)
add_test(NAME ReadConnRouterAuthTest COMMAND $<TARGET_FILE:testconnect> 10000 ${TEST_HOST} ${MASTER_PORT} ${TEST_HOST} ${TEST_PORT} 1.10)
endif()