Merge branch 'release-1.2' into schemarouter_refresh

This commit is contained in:
Markus Makela
2015-07-13 19:10:05 +03:00
54 changed files with 550 additions and 315 deletions

View File

@ -35,7 +35,7 @@ extern __thread log_info_t tls_log_info;
* that routes to a named server if a regular expression match is found.
* @verbatim
*
* A simple regular expression query rewrite filter.
* A simple regular expression based query routing filter.
* Two parameters should be defined in the filter configuration
* match=<regular expression>
* server=<server to route statement to>

View File

@ -26,7 +26,8 @@
*
* Date Who Description
* 02/04/14 Mark Riddoch Initial implementation
* 11/05/15 Massimilaino Pinto Added mariadb10_compat to master and slave structs
* 11/05/15 Massimiliano Pinto Added mariadb10_compat to master and slave structs
* 12/06/15 Massimiliano Pinto Added mariadb10 new events
*
* @endverbatim
*/
@ -45,7 +46,77 @@
#define BINLOG_EVENT_HDR_LEN 19
/* How often to call the binlog status function (seconds) */
/**
* Binlog event types
*/
#define START_EVENT_V3 0x01
#define QUERY_EVENT 0x02
#define STOP_EVENT 0x03
#define ROTATE_EVENT 0x04
#define INTVAR_EVENT 0x05
#define LOAD_EVENT 0x06
#define SLAVE_EVENT 0x07
#define CREATE_FILE_EVENT 0x08
#define APPEND_BLOCK_EVENT 0x09
#define EXEC_LOAD_EVENT 0x0A
#define DELETE_FILE_EVENT 0x0B
#define NEW_LOAD_EVENT 0x0C
#define RAND_EVENT 0x0D
#define USER_VAR_EVENT 0x0E
#define FORMAT_DESCRIPTION_EVENT 0x0F
#define XID_EVENT 0x10
#define BEGIN_LOAD_QUERY_EVENT 0x11
#define EXECUTE_LOAD_QUERY_EVENT 0x12
#define TABLE_MAP_EVENT 0x13
#define WRITE_ROWS_EVENTv0 0x14
#define UPDATE_ROWS_EVENTv0 0x15
#define DELETE_ROWS_EVENTv0 0x16
#define WRITE_ROWS_EVENTv1 0x17
#define UPDATE_ROWS_EVENTv1 0x18
#define DELETE_ROWS_EVENTv1 0x19
#define INCIDENT_EVENT 0x1A
#define HEARTBEAT_EVENT 0x1B
#define IGNORABLE_EVENT 0x1C
#define ROWS_QUERY_EVENT 0x1D
#define WRITE_ROWS_EVENTv2 0x1E
#define UPDATE_ROWS_EVENTv2 0x1F
#define DELETE_ROWS_EVENTv2 0x20
#define GTID_EVENT 0x21
#define ANONYMOUS_GTID_EVENT 0x22
#define PREVIOUS_GTIDS_EVENT 0x23
#define MAX_EVENT_TYPE 0x23
/* New MariaDB event numbers start from 0xa0 */
#define MARIADB_NEW_EVENTS_BEGIN 0xa0
#define MARIADB_ANNOTATE_ROWS_EVENT 0xa0
/* New MariaDB 10 event numbers start from here */
#define MARIADB10_BINLOG_CHECKPOINT_EVENT 0xa1
#define MARIADB10_GTID_EVENT 0xa2
#define MARIADB10_GTID_GTID_LIST_EVENT 0xa3
#define MAX_EVENT_TYPE_MARIADB10 0xa3
/* Maximum event type so far */
#define MAX_EVENT_TYPE_END MAX_EVENT_TYPE_MARIADB10
/**
* Binlog event flags
*/
#define LOG_EVENT_BINLOG_IN_USE_F 0x0001
#define LOG_EVENT_FORCED_ROTATE_F 0x0002
#define LOG_EVENT_THREAD_SPECIFIC_F 0x0004
#define LOG_EVENT_SUPPRESS_USE_F 0x0008
#define LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F 0x0010
#define LOG_EVENT_ARTIFICIAL_F 0x0020
#define LOG_EVENT_RELAY_LOG_F 0x0040
#define LOG_EVENT_IGNORABLE_F 0x0080
#define LOG_EVENT_NO_FILTER_F 0x0100
#define LOG_EVENT_MTS_ISOLATE_F 0x0200
/**
* How often to call the binlog status function (seconds)
*/
#define BLR_STATS_FREQ 60
#define BLR_NSTATS_MINUTES 30
@ -212,7 +283,7 @@ typedef struct {
uint64_t n_fakeevents; /*< Fake events not written to disk */
uint64_t n_artificial; /*< Artificial events not written to disk */
int n_badcrc; /*< No. of bad CRC's from master */
uint64_t events[0x24]; /*< Per event counters */
uint64_t events[MAX_EVENT_TYPE_END + 1]; /*< Per event counters */
uint64_t lastsample;
int minno;
int minavgs[BLR_NSTATS_MINUTES];
@ -327,7 +398,7 @@ static char *blrm_states[] = { "Unconnected", "Connecting", "Authenticated", "Ti
"binlog checksum rerieval", "GTID Mode retrieval", "Master UUID retrieval",
"Set Slave UUID", "Set Names latin1", "Set Names utf8", "select 1",
"select version()", "select @@version_comment", "select @@hostname",
"select @@mx_allowed_packet", "Register slave", "Binlog Dump", "Set MariaDB slave capability" };
"select @@max_allowed_packet", "Register slave", "Binlog Dump", "Set MariaDB slave capability" };
#define BLRS_CREATED 0x0000
#define BLRS_UNREGISTERED 0x0001
@ -361,62 +432,6 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered",
#define COM_REGISTER_SLAVE 0x15
#define COM_BINLOG_DUMP 0x12
/**
* Binlog event types
*/
#define START_EVENT_V3 0x01
#define QUERY_EVENT 0x02
#define STOP_EVENT 0x03
#define ROTATE_EVENT 0x04
#define INTVAR_EVENT 0x05
#define LOAD_EVENT 0x06
#define SLAVE_EVENT 0x07
#define CREATE_FILE_EVENT 0x08
#define APPEND_BLOCK_EVENT 0x09
#define EXEC_LOAD_EVENT 0x0A
#define DELETE_FILE_EVENT 0x0B
#define NEW_LOAD_EVENT 0x0C
#define RAND_EVENT 0x0D
#define USER_VAR_EVENT 0x0E
#define FORMAT_DESCRIPTION_EVENT 0x0F
#define XID_EVENT 0x10
#define BEGIN_LOAD_QUERY_EVENT 0x11
#define EXECUTE_LOAD_QUERY_EVENT 0x12
#define TABLE_MAP_EVENT 0x13
#define WRITE_ROWS_EVENTv0 0x14
#define UPDATE_ROWS_EVENTv0 0x15
#define DELETE_ROWS_EVENTv0 0x16
#define WRITE_ROWS_EVENTv1 0x17
#define UPDATE_ROWS_EVENTv1 0x18
#define DELETE_ROWS_EVENTv1 0x19
#define INCIDENT_EVENT 0x1A
#define HEARTBEAT_EVENT 0x1B
#define IGNORABLE_EVENT 0x1C
#define ROWS_QUERY_EVENT 0x1D
#define WRITE_ROWS_EVENTv2 0x1E
#define UPDATE_ROWS_EVENTv2 0x1F
#define DELETE_ROWS_EVENTv2 0x20
#define GTID_EVENT 0x21
#define ANONYMOUS_GTID_EVENT 0x22
#define PREVIOUS_GTIDS_EVENT 0x23
#define MAX_EVENT_TYPE 0x23
#define MAX_EVENT_TYPE_MARIADB10 0xa3
/**
* Binlog event flags
*/
#define LOG_EVENT_BINLOG_IN_USE_F 0x0001
#define LOG_EVENT_FORCED_ROTATE_F 0x0002
#define LOG_EVENT_THREAD_SPECIFIC_F 0x0004
#define LOG_EVENT_SUPPRESS_USE_F 0x0008
#define LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F 0x0010
#define LOG_EVENT_ARTIFICIAL_F 0x0020
#define LOG_EVENT_RELAY_LOG_F 0x0040
#define LOG_EVENT_IGNORABLE_F 0x0080
#define LOG_EVENT_NO_FILTER_F 0x0100
#define LOG_EVENT_MTS_ISOLATE_F 0x0200
/**
* Macros to extract common fields
*/

View File

@ -1080,7 +1080,7 @@ gw_backend_hangup(DCB *dcb)
len = sizeof(error);
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *)&len) == 0)
{
if (error != 0)
if (error != 0 && ses_state != SESSION_STATE_STOPPING)
{
strerror_r(error, buf, 100);
LOGIF(LE, (skygw_log_write_flush(
@ -1094,9 +1094,12 @@ gw_backend_hangup(DCB *dcb)
goto retblock;
}
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend hangup error handling.")));
if(ses_state != SESSION_STATE_STOPPING)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend hangup error handling.")));
}
#endif
router->handleError(router_instance,

View File

@ -1092,7 +1092,8 @@ int gw_read_client_event(
case MYSQL_IDLE:
{
uint8_t* payload = NULL;
session_state_t ses_state;
session = dcb->session;
ss_dassert(session!= NULL);
@ -1100,93 +1101,106 @@ int gw_read_client_event(
{
CHK_SESSION(session);
}
spinlock_acquire(&session->ses_lock);
ses_state = session->state;
spinlock_release(&session->ses_lock);
/* Now, we are assuming in the first buffer there is
* the information form mysql command */
payload = GWBUF_DATA(read_buffer);
/** Route COM_QUIT to backend */
if (MYSQL_IS_COM_QUIT(payload))
{
if(ses_state == SESSION_STATE_ROUTER_READY)
{
/** Route COM_QUIT to backend */
if (MYSQL_IS_COM_QUIT(payload))
{
/**
* Sends COM_QUIT packets since buffer is already
* created. A BREF_CLOSED flag is set so dcb_close won't
* send redundant COM_QUIT.
*/
* Sends COM_QUIT packets since buffer is already
* created. A BREF_CLOSED flag is set so dcb_close won't
* send redundant COM_QUIT.
*/
SESSION_ROUTE_QUERY(session, read_buffer);
/**
* Close router session which causes closing of backends.
*/
* Close router session which causes closing of backends.
*/
dcb_close(dcb);
}
else
{
}
else
{
/** Reset error handler when routing of the new query begins */
router->handleError(NULL, NULL, NULL, dcb, ERRACT_RESET, NULL);
if (stmt_input)
{
/**
* Feed each statement completely and separately
* to router.
*/
rc = route_by_statement(session, &read_buffer);
if (read_buffer != NULL)
{
/** add incomplete mysql packet to read queue */
dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, read_buffer);
}
/**
* Feed each statement completely and separately
* to router.
*/
rc = route_by_statement(session, &read_buffer);
if (read_buffer != NULL)
{
/** add incomplete mysql packet to read queue */
dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, read_buffer);
}
}
else
{
/** Feed whole packet to router */
rc = SESSION_ROUTE_QUERY(session, read_buffer);
/** Feed whole packet to router */
rc = SESSION_ROUTE_QUERY(session, read_buffer);
}
/** Routing succeed */
if (rc)
{
rc = 0; /**< here '0' means success */
rc = 0; /**< here '0' means success */
}
else
{
bool succp;
GWBUF* errbuf;
/**
* Create error to be sent to client if session
* can't be continued.
*/
errbuf = mysql_create_custom_error(
1,
0,
"Routing failed. Session is closed.");
/**
* Ensure that there are enough backends
* available.
*/
router->handleError(
router_instance,
session->router_session,
errbuf,
dcb,
ERRACT_NEW_CONNECTION,
&succp);
gwbuf_free(errbuf);
/**
* If there are not enough backends close
* session
*/
if (!succp)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing the query failed. "
"Session will be closed.")));
bool succp;
GWBUF* errbuf;
/**
* Create error to be sent to client if session
* can't be continued.
*/
errbuf = mysql_create_custom_error(
1,
0,
"Routing failed. Session is closed.");
/**
* Ensure that there are enough backends
* available.
*/
router->handleError(
router_instance,
session->router_session,
errbuf,
dcb,
ERRACT_NEW_CONNECTION,
&succp);
gwbuf_free(errbuf);
/**
* If there are not enough backends close
* session
*/
if (!succp)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing the query failed. "
"Session will be closed.")));
dcb_close(dcb);
}
dcb_close(dcb);
}
}
}
}
}
else
{
skygw_log_write_flush(LT,"Session received a query in state %s",
STRSESSIONSTATE(ses_state));
while((read_buffer = GWBUF_CONSUME_ALL(read_buffer)) != NULL);
goto return_rc;
}
goto return_rc;
} /* MYSQL_IDLE */
break;

View File

@ -36,6 +36,7 @@
* 17/02/2015 Massimiliano Pinto Addition of slave port and username in diagnostics
* 18/02/2015 Massimiliano Pinto Addition of dcb_close in closeSession
* 07/05/2015 Massimiliano Pinto Addition of MariaDB 10 compatibility support
* 12/06/2015 Massimiliano Pinto Addition of MariaDB 10 events in diagnostics()
*
* @endverbatim
@ -682,6 +683,15 @@ static char *event_names[] = {
"Anonymous GTID Event", "Previous GTIDS Event"
};
/* New MariaDB event numbers starts from 0xa0 */
static char *event_names_mariadb10[] = {
"Annotate Rows Event",
/* New MariaDB 10.x event numbers */
"Binlog Checkpoint Event",
"GTID Event",
"GTID List Event"
};
/**
* Display an entry from the spinlock statistics data
*
@ -798,11 +808,28 @@ struct tm tm;
buf);
dcb_printf(dcb, "\t (%d seconds ago)\n",
time(0) - router_inst->stats.lastReply);
dcb_printf(dcb, "\tLast event from master: 0x%x, %s",
if (!router_inst->mariadb10_compat) {
dcb_printf(dcb, "\tLast event from master: 0x%x, %s",
router_inst->lastEventReceived,
(router_inst->lastEventReceived >= 0 &&
router_inst->lastEventReceived < 0x24) ?
router_inst->lastEventReceived <= MAX_EVENT_TYPE) ?
event_names[router_inst->lastEventReceived] : "unknown");
} else {
char *ptr = NULL;
if (router_inst->lastEventReceived >= 0 && router_inst->lastEventReceived <= MAX_EVENT_TYPE) {
ptr = event_names[router_inst->lastEventReceived];
} else {
/* Check MariaDB 10 new events */
if (router_inst->lastEventReceived >= MARIADB_NEW_EVENTS_BEGIN && router_inst->lastEventReceived <= MAX_EVENT_TYPE_MARIADB10) {
ptr = event_names_mariadb10[(router_inst->lastEventReceived - MARIADB_NEW_EVENTS_BEGIN)];
}
}
dcb_printf(dcb, "\tLast event from master: 0x%x, %s",
router_inst->lastEventReceived, (ptr != NULL) ? ptr : "unknown");
}
if (router_inst->lastEventTimestamp)
{
localtime_r(&router_inst->lastEventTimestamp, &tm);
@ -815,11 +842,17 @@ struct tm tm;
if (router_inst->reconnect_pending)
dcb_printf(dcb, "\tRouter pending reconnect to master\n");
dcb_printf(dcb, "\tEvents received:\n");
for (i = 0; i < 0x24; i++)
for (i = 0; i <= MAX_EVENT_TYPE; i++)
{
dcb_printf(dcb, "\t\t%-38s %u\n", event_names[i], router_inst->stats.events[i]);
}
if (router_inst->mariadb10_compat) {
/* Display MariaDB 10 new events */
for (i = MARIADB_NEW_EVENTS_BEGIN; i <= MAX_EVENT_TYPE_MARIADB10; i++)
dcb_printf(dcb, "\t\tMariaDB 10 %-38s %u\n", event_names_mariadb10[(i - MARIADB_NEW_EVENTS_BEGIN)], router_inst->stats.events[i]);
}
#if SPINLOCK_PROFILE
dcb_printf(dcb, "\tSpinlock statistics (instlock):\n");
spinlock_stats(&instlock, spin_reporter, dcb);

View File

@ -917,6 +917,7 @@ static REP_HEADER phdr;
phdr = hdr;
if (hdr.ok == 0)
{
int event_limit;
/*
* First check that the checksum we calculate matches the
* checksum in the packet we received.
@ -957,8 +958,11 @@ static REP_HEADER phdr;
#ifdef SHOW_EVENTS
printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size);
#endif
if (hdr.event_type >= 0 && hdr.event_type < 0x24)
event_limit = router->mariadb10_compat ? MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE;
if (hdr.event_type >= 0 && hdr.event_type <= event_limit)
router->stats.events[hdr.event_type]++;
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
{
// Fake format description message

View File

@ -723,16 +723,10 @@ routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
SERVER_IS_DOWN(router_cli_ses->backend->server))
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
LOGFILE_TRACE|LOGFILE_ERROR,
"Error : Failed to route MySQL command %d to backend "
"server.",
mysql_command)));
skygw_log_write(
LOGFILE_ERROR,
"Error : Failed to route MySQL command %d to backend "
"server %s.",
mysql_command,
router_cli_ses->backend->server->unique_name);
"server.%s",
mysql_command,rses_is_closed ? " Session is closed." : "")));
rc = 0;
goto return_rc;

View File

@ -3787,7 +3787,7 @@ static GWBUF* sescmd_cursor_process_replies(
dcb_close(bref->bref_dcb);
*reconnect = true;
if(replybuf)
gwbuf_consume(replybuf,gwbuf_length(replybuf));
while((replybuf = gwbuf_consume(replybuf,gwbuf_length(replybuf))));
}
}
/** This is a response from the master and it is the "right" one.
@ -3830,7 +3830,7 @@ static GWBUF* sescmd_cursor_process_replies(
skygw_log_write(LOGFILE_DEBUG,"Slave '%s' responded faster to a session command.",
bref->bref_backend->backend_server->unique_name);
if(replybuf)
gwbuf_free(replybuf);
while((replybuf = gwbuf_consume(replybuf,gwbuf_length(replybuf))));
return NULL;
}
@ -3915,7 +3915,7 @@ static GWBUF* sescmd_cursor_clone_querybuf(
}
ss_dassert(scur->scmd_cur_cmd != NULL);
buf = gwbuf_clone(scur->scmd_cur_cmd->my_sescmd_buf);
buf = gwbuf_clone_all(scur->scmd_cur_cmd->my_sescmd_buf);
CHK_GWBUF(buf);
return buf;
@ -4011,7 +4011,7 @@ static bool execute_sescmd_in_backend(
bool succp;
int rc = 0;
sescmd_cursor_t* scur;
GWBUF* buf;
if(backend_ref == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to execute_sescmd_in_backend. (%s:%d)",__FILE__,__LINE__);
@ -4048,27 +4048,9 @@ static bool execute_sescmd_in_backend(
/** Cursor is left active when function returns. */
sescmd_cursor_set_active(scur, true);
}
#if defined(SS_DEBUG)
LOGIF(LT, tracelog_routed_query(scur->scmd_cur_rses,
"execute_sescmd_in_backend",
backend_ref,
sescmd_cursor_clone_querybuf(scur)));
{
GWBUF* tmpbuf = sescmd_cursor_clone_querybuf(scur);
uint8_t* ptr = GWBUF_DATA(tmpbuf);
unsigned char cmd = MYSQL_GET_COMMAND(ptr);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [execute_sescmd_in_backend] Just before write, fd "
"%d : cmd %s.",
pthread_self(),
dcb->fd,
STRPACKETTYPE(cmd))));
gwbuf_free(tmpbuf);
}
#endif /*< SS_DEBUG */
buf = sescmd_cursor_clone_querybuf(scur);
switch (scur->scmd_cur_cmd->my_sescmd_packet_type) {
case MYSQL_COM_CHANGE_USER:
/** This makes it possible to handle replies correctly */
@ -4077,7 +4059,7 @@ static bool execute_sescmd_in_backend(
dcb,
NULL,
dcb->session,
sescmd_cursor_clone_querybuf(scur));
buf);
break;
case MYSQL_COM_INIT_DB:
@ -4103,10 +4085,11 @@ static bool execute_sescmd_in_backend(
* Mark session command buffer, it triggers writing
* MySQL command to protocol
*/
gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD);
rc = dcb->func.write(
dcb,
sescmd_cursor_clone_querybuf(scur));
buf);
break;
}
@ -4116,6 +4099,7 @@ static bool execute_sescmd_in_backend(
}
else
{
while((buf = GWBUF_CONSUME_ALL(buf)) != NULL);
succp = false;
}
return_succp:

View File

@ -46,7 +46,7 @@ bool extract_database(GWBUF* buf, char* str)
tok = strtok_r(query," ;",&saved);
if(tok == NULL || strcasecmp(tok,"use") != 0)
{
skygw_log_write(LOGFILE_ERROR,"Schemarouter: Malformed chage database packet.");
skygw_log_write(LOGFILE_ERROR,"extract_database: Malformed chage database packet.");
succp = false;
goto retblock;
}
@ -54,7 +54,7 @@ bool extract_database(GWBUF* buf, char* str)
tok = strtok_r(NULL," ;",&saved);
if(tok == NULL)
{
skygw_log_write(LOGFILE_ERROR,"Schemarouter: Malformed chage database packet.");
skygw_log_write(LOGFILE_ERROR,"extract_database: Malformed chage database packet.");
succp = false;
goto retblock;
}

View File

@ -1701,7 +1701,7 @@ routeQuery(ROUTER* instance,
querybuf)))
{
extract_database(querybuf,db);
snprintf(errbuf,"Unknown database: %s",db);
snprintf(errbuf,25+MYSQL_DATABASE_MAXLEN,"Unknown database: %s",db);
create_error_reply(errbuf,router_cli_ses->replydcb);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,