Merge branch '2.1.7' into develop-2.1-merge

This commit is contained in:
Johan Wikman
2017-09-12 11:08:02 +03:00
28 changed files with 367 additions and 90 deletions

View File

@ -661,6 +661,27 @@ static inline bool collecting_resultset(MySQLProtocol *proto, uint64_t capabilit
proto->collect_result;
}
/**
* Helpers for checking OK and ERR packets specific to COM_CHANGE_USER
*/
static inline bool not_ok_packet(const GWBUF* buffer)
{
uint8_t* data = GWBUF_DATA(buffer);
return data[4] != MYSQL_REPLY_OK ||
// Should be more than 7 bytes of payload
gw_mysql_get_byte3(data) < MYSQL_OK_PACKET_MIN_LEN - MYSQL_HEADER_LEN ||
// Should have no affected rows
data[5] != 0 ||
// Should not generate an insert ID
data[6] != 0;
}
static inline bool not_err_packet(const GWBUF* buffer)
{
return GWBUF_DATA(buffer)[4] != MYSQL_REPLY_ERR;
}
/**
* @brief With authentication completed, read new data and write to backend
*
@ -701,8 +722,9 @@ gw_read_and_write(DCB *dcb)
/** Ask what type of output the router/filter chain expects */
uint64_t capabilities = service_get_capabilities(session->service);
bool result_collected = false;
MySQLProtocol *proto = (MySQLProtocol *)dcb->protocol;
if (rcap_type_required(capabilities, RCAP_TYPE_STMT_OUTPUT))
if (rcap_type_required(capabilities, RCAP_TYPE_STMT_OUTPUT) || (proto->ignore_replies != 0))
{
GWBUF *tmp = modutil_get_complete_packets(&read_buffer);
/* Put any residue into the read queue */
@ -717,10 +739,9 @@ gw_read_and_write(DCB *dcb)
read_buffer = tmp;
MySQLProtocol *proto = (MySQLProtocol*)dcb->protocol;
if (rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_OUTPUT) ||
proto->collect_result)
proto->collect_result ||
proto->ignore_replies != 0)
{
if ((tmp = gwbuf_make_contiguous(read_buffer)))
{
@ -767,39 +788,67 @@ gw_read_and_write(DCB *dcb)
}
}
MySQLProtocol *proto = (MySQLProtocol *)dcb->protocol;
if (proto->ignore_replies > 0)
{
/**
* The reply to an ignorable command is in the packet. Extract the
* response type and discard the response.
*/
uint8_t result = 0xff;
gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN, 1, &result);
proto->ignore_replies--;
ss_dassert(proto->ignore_replies >= 0);
gwbuf_free(read_buffer);
int rval = 0;
/** The reply to a COM_CHANGE_USER is in packet */
GWBUF *query = proto->stored_query;
proto->stored_query = NULL;
proto->ignore_replies--;
ss_dassert(proto->ignore_replies >= 0);
GWBUF* reply = modutil_get_next_MySQL_packet(&read_buffer);
while (read_buffer)
{
/** Skip to the last packet if we get more than one */
gwbuf_free(reply);
reply = modutil_get_next_MySQL_packet(&read_buffer);
}
ss_dassert(reply);
ss_dassert(!read_buffer);
uint8_t result = MYSQL_GET_COMMAND(GWBUF_DATA(reply));
int rval = 0;
if (result == MYSQL_REPLY_OK)
{
MXS_INFO("Response to COM_CHANGE_USER is OK, writing stored query");
rval = query ? dcb->func.write(dcb, query) : 1;
}
else if (query)
else
{
/**
* The ignorable command failed when we had a queued query from the
* client. Generate a fake hangup event to close the DCB and send
* an error to the client.
*/
if (result == MYSQL_REPLY_ERR)
{
/** The COM_CHANGE USER failed, generate a fake hangup event to
* close the DCB and send an error to the client. */
log_error_response(dcb, reply);
}
else if (result == MYSQL_REPLY_AUTHSWITCHREQUEST &&
gwbuf_length(reply) > MYSQL_EOF_PACKET_LEN)
{
MXS_ERROR("Received AuthSwitchRequest to '%s' when '%s' was expected",
(char*)GWBUF_DATA(reply) + 5, DEFAULT_MYSQL_AUTH_PLUGIN);
}
else
{
/** This should never happen */
MXS_ERROR("Unknown response to COM_CHANGE_USER (0x%02hhx), "
"ignoring and waiting for correct result", result);
gwbuf_free(reply);
proto->stored_query = query;
proto->ignore_replies++;
return 1;
}
gwbuf_free(query);
poll_fake_hangup_event(dcb);
}
gwbuf_free(reply);
return rval;
}
@ -947,10 +996,28 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
CHK_DCB(dcb);
if (dcb->was_persistent && dcb->state == DCB_STATE_POLLING &&
backend_protocol->protocol_auth_state == MXS_AUTH_STATE_COMPLETE)
if (dcb->was_persistent)
{
ss_dassert(!dcb->fakeq);
ss_dassert(!dcb->readq);
ss_dassert(!dcb->delayq);
ss_dassert(!dcb->writeq);
ss_dassert(dcb->persistentstart == 0);
dcb->was_persistent = false;
ss_dassert(backend_protocol->ignore_replies >= 0);
backend_protocol->ignore_replies = 0;
if (dcb->state != DCB_STATE_POLLING ||
backend_protocol->protocol_auth_state != MXS_AUTH_STATE_COMPLETE)
{
MXS_INFO("DCB and protocol state do not qualify for pooling: %s, %s",
STRDCBSTATE(dcb->state),
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state));
gwbuf_free(queue);
return 0;
}
/**
* This is a DCB that was just taken out of the persistent connection pool.
* We need to sent a COM_CHANGE_USER query to the backend to reset the
@ -962,18 +1029,45 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
* response is received. */
gwbuf_free(backend_protocol->stored_query);
}
dcb->was_persistent = false;
backend_protocol->ignore_replies++;
ss_dassert(backend_protocol->ignore_replies > 0);
backend_protocol->stored_query = queue;
if (MYSQL_IS_COM_QUIT(GWBUF_DATA(queue)))
{
/** The connection is being closed before the first write to this
* backend was done. The COM_QUIT is ignored and the DCB will be put
* back into the pool once it's closed. */
MXS_INFO("COM_QUIT received as the first write, ignoring and "
"sending the DCB back to the pool.");
gwbuf_free(queue);
return 1;
}
GWBUF *buf = gw_create_change_user_packet(dcb->session->client_dcb->data, dcb->protocol);
return dcb_write(dcb, buf) ? 1 : 0;
int rc = 0;
if (dcb_write(dcb, buf))
{
MXS_INFO("Sent COM_CHANGE_USER");
backend_protocol->ignore_replies++;
backend_protocol->stored_query = queue;
rc = 1;
}
else
{
gwbuf_free(queue);
}
return rc;
}
else if (backend_protocol->ignore_replies > 0)
{
if (MYSQL_IS_COM_QUIT((uint8_t*)GWBUF_DATA(queue)))
{
/** The COM_CHANGE_USER was already sent but the session is already
* closing. We ignore the COM_QUIT in the hopes that the response
* to the COM_CHANGE_USER comes before the DCB is closed. If the
* DCB is closed before the response arrives, the connection will
* not qualify the persistent connection pool. */
MXS_INFO("COM_QUIT received while COM_CHANGE_USER is in progress, ignoring");
gwbuf_free(queue);
}
else
@ -981,8 +1075,10 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
/**
* We're still waiting on the reply to the COM_CHANGE_USER, append the
* buffer to the stored query. This is possible if the client sends
* BLOB data on the first command.
* BLOB data on the first command or is sending multiple COM_QUERY
* packets at one time.
*/
MXS_INFO("COM_CHANGE_USER in progress, appending query to queue");
backend_protocol->stored_query = gwbuf_append(backend_protocol->stored_query, queue);
}
return 1;
@ -1210,6 +1306,8 @@ static void backend_set_delayqueue(DCB *dcb, GWBUF *queue)
static int backend_write_delayqueue(DCB *dcb, GWBUF *buffer)
{
ss_dassert(buffer);
ss_dassert(dcb->persistentstart == 0);
ss_dassert(!dcb->was_persistent);
if (MYSQL_IS_CHANGE_USER(((uint8_t *)GWBUF_DATA(buffer))))
{
@ -1958,5 +2056,8 @@ static bool get_ip_string_and_port(struct sockaddr_storage *sa,
static bool gw_connection_established(DCB* dcb)
{
MySQLProtocol *proto = (MySQLProtocol*)dcb->protocol;
return proto->protocol_auth_state == MXS_AUTH_STATE_COMPLETE;
return
proto->protocol_auth_state == MXS_AUTH_STATE_COMPLETE &&
(proto->ignore_replies == 0)
&& !proto->stored_query;
}

View File

@ -1038,6 +1038,19 @@ gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities)
/** Reset error handler when routing of the new query begins */
dcb->dcb_errhandle_called = false;
if (proto->current_command == MYSQL_COM_QUIT)
{
/** The client is closing the connection. We know that this will be the
* last command the client sends so the backend connections are very likely
* to be in an idle state.
*
* If the client is pipelining the queries (i.e. sending N request as
* a batch and then expecting N responses) then it is possible that
* the backend connections are not idle when the COM_QUIT is received.
* In most cases we can assume that the connections are idle. */
session_qualify_for_pool(session);
}
if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT))
{
/**

View File

@ -79,6 +79,7 @@ MySQLProtocol* mysql_protocol_init(DCB* dcb, int fd)
p->protocol_command.scom_nbytes_to_read = 0;
p->stored_query = NULL;
p->extra_capabilities = 0;
p->ignore_replies = 0;
#if defined(SS_DEBUG)
p->protocol_chk_top = CHK_NUM_PROTOCOL;
p->protocol_chk_tail = CHK_NUM_PROTOCOL;

View File

@ -635,38 +635,24 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router)
MXS_DEBUG("%s(%x) - %llu", binlog_event_name(hdr.event_type), hdr.event_type, pos);
uint32_t original_size = hdr.event_size;
if (router->binlog_checksum)
{
hdr.event_size -= 4;
}
/* check for FORMAT DESCRIPTION EVENT */
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
{
int event_header_length;
int event_header_ntypes;
const int BLRM_FDE_EVENT_TYPES_OFFSET = 2 + 50 + 4 + 1;
const int FDE_EXTRA_BYTES = 5;
int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1];
int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES;
uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES;
/** Extract the event header lengths */
event_header_length = ptr[2 + 50 + 4];
event_header_ntypes = hdr.event_size - event_header_length - (2 + 50 + 4 + 1);
memcpy(router->event_type_hdr_lens, ptr + 2 + 50 + 5, event_header_ntypes);
router->event_types = event_header_ntypes;
switch (event_header_ntypes)
{
case 168: /* mariadb 10 LOG_EVENT_TYPES*/
event_header_ntypes -= 163;
break;
case 165: /* mariadb 5 LOG_EVENT_TYPES*/
event_header_ntypes -= 160;
break;
default: /* mysql 5.6 LOG_EVENT_TYPES = 35 */
event_header_ntypes -= 35;
break;
}
uint8_t *checksum = ptr + hdr.event_size - event_header_length - event_header_ntypes;
if (checksum[0] == 1)
{
found_chksum = true;
}
router->event_types = n_events;
router->binlog_checksum = checksum[0];
}
/* Decode CLOSE/STOP Event */
else if (hdr.event_type == STOP_EVENT)
@ -767,7 +753,7 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router)
break;
}
if (hdr.next_pos > 0 && hdr.next_pos != (pos + hdr.event_size))
if (hdr.next_pos > 0 && hdr.next_pos != (pos + original_size))
{
MXS_INFO("Binlog %s: next pos %u != (pos %llu + event_size %u), truncating to %llu",
router->binlog_name, hdr.next_pos, pos, hdr.event_size, pos);
@ -1013,7 +999,7 @@ void handle_query_event(AVRO_INSTANCE *router, REP_HEADER *hdr, int *pending_tra
{
int dblen = ptr[DBNM_OFF];
int vblklen = ptr[VBLK_OFF];
int len = hdr->event_size - BINLOG_EVENT_HDR_LEN - (PHDR_OFF + vblklen + 1 + dblen) + 1;
int len = hdr->event_size - BINLOG_EVENT_HDR_LEN - (PHDR_OFF + vblklen + 1 + dblen);
char *sql = (char *) ptr + PHDR_OFF + vblklen + 1 + dblen;
char db[dblen + 1];
memcpy(db, (char*) ptr + PHDR_OFF + vblklen, dblen);

View File

@ -1175,9 +1175,9 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
{
tok = get_tok(tok + len, &len, end);
create->column_names = MXS_REALLOC(create->column_names, sizeof(char*) * create->columns + 1);
create->column_types = MXS_REALLOC(create->column_types, sizeof(char*) * create->columns + 1);
create->column_lengths = MXS_REALLOC(create->column_lengths, sizeof(int) * create->columns + 1);
create->column_names = MXS_REALLOC(create->column_names, sizeof(char*) * (create->columns + 1));
create->column_types = MXS_REALLOC(create->column_types, sizeof(char*) * (create->columns + 1));
create->column_lengths = MXS_REALLOC(create->column_lengths, sizeof(int) * (create->columns + 1));
char avro_token[len + 1];
make_avro_token(avro_token, tok, len);
@ -1208,9 +1208,9 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
create->column_lengths[i] = create->column_lengths[i + 1];
}
create->column_names = MXS_REALLOC(create->column_names, sizeof(char*) * create->columns - 1);
create->column_types = MXS_REALLOC(create->column_types, sizeof(char*) * create->columns - 1);
create->column_lengths = MXS_REALLOC(create->column_lengths, sizeof(int) * create->columns - 1);
create->column_names = MXS_REALLOC(create->column_names, sizeof(char*) * (create->columns - 1));
create->column_types = MXS_REALLOC(create->column_types, sizeof(char*) * (create->columns - 1));
create->column_lengths = MXS_REALLOC(create->column_lengths, sizeof(int) * (create->columns - 1));
create->columns--;
updates++;
}

View File

@ -283,6 +283,7 @@ typedef struct avro_instance
pcre2_code *alter_table_re;
uint8_t event_types;
uint8_t event_type_hdr_lens[MAX_EVENT_TYPE_END];
uint8_t binlog_checksum;
gtid_pos_t gtid;
TABLE_MAP *active_maps[MAX_MAPPED_TABLES];
HASHTABLE *table_maps;

View File

@ -1559,6 +1559,8 @@ struct subcommand alteroptions[] =
"ssl_ca_cert Path to SSL CA certificate\n"
"ssl_version SSL version\n"
"ssl_cert_verify_depth Certificate verification depth\n"
"persistpoolmax Persisted connection pool size\n"
"persistmaxtime Persisted connection maximum idle time\n"
"\n"
"To configure SSL for a newly created server, the 'ssl', 'ssl_cert',\n"
"'ssl_key' and 'ssl_ca_cert' parameters must be given at the same time.\n"
@ -1898,7 +1900,7 @@ execute_cmd(CLI_SESSION *cli)
{
DCB *dcb = cli->session->client_dcb;
int argc, i, j, found = 0;
char *args[MAXARGS + 1];
char *args[MAXARGS + 4];
int in_quotes = 0, escape_next = 0;
char *ptr, *lptr;
bool in_space = false;

View File

@ -312,7 +312,7 @@ newSession(MXS_ROUTER *instance, MXS_SESSION *session)
*/
for (SERVER_REF *ref = inst->service->dbref; ref; ref = ref->next)
{
if (!SERVER_REF_IS_ACTIVE(ref) || SERVER_IN_MAINT(ref->server) || ref->weight == 0)
if (!SERVER_REF_IS_ACTIVE(ref) || SERVER_IN_MAINT(ref->server))
{
continue;
}
@ -370,6 +370,10 @@ newSession(MXS_ROUTER *instance, MXS_SESSION *session)
{
candidate = ref;
}
else if (ref->weight == 0 || candidate->weight == 0)
{
candidate = ref->weight ? ref : candidate;
}
else if (((ref->connections + 1) * 1000) / ref->weight <
((candidate->connections + 1) * 1000) / candidate->weight)
{