Remove queuing that is no longer required

Fixed bug in mysql_backend.c that lost up to 4 bytes of data in rare
circumstances.
This commit is contained in:
Mark Riddoch
2014-09-10 15:51:53 +01:00
parent c273988e51
commit f9aece5113
4 changed files with 155 additions and 224 deletions

View File

@ -126,7 +126,6 @@ typedef struct {
int n_registered; /*< Number of registered slaves */ int n_registered; /*< Number of registered slaves */
int n_masterstarts; /*< Number of times connection restarted */ int n_masterstarts; /*< Number of times connection restarted */
int n_delayedreconnects; int n_delayedreconnects;
int n_queueadd; /*< Number of times incoming data was added to processign queue */
int n_residuals; /*< Number of times residual data was buffered */ int n_residuals; /*< Number of times residual data was buffered */
int n_heartbeats; /*< Number of heartbeat messages */ int n_heartbeats; /*< Number of heartbeat messages */
time_t lastReply; time_t lastReply;
@ -216,10 +215,9 @@ typedef struct router_instance {
unsigned int high_water; /*< High water mark for client DCB */ unsigned int high_water; /*< High water mark for client DCB */
BLCACHE *cache[2]; BLCACHE *cache[2];
ROUTER_STATS stats; /*< Statistics for this router */ ROUTER_STATS stats; /*< Statistics for this router */
SPINLOCK alock;
int active_logs; int active_logs;
int reconnect_pending; int reconnect_pending;
GWBUF *queue; int handling_threads;
struct router_instance struct router_instance
*next; *next;
} ROUTER_INSTANCE; } ROUTER_INSTANCE;

View File

@ -497,7 +497,7 @@ static int gw_read_backend_event(DCB *dcb) {
{ {
if (nbytes_read < 5) if (nbytes_read < 5)
{ {
gwbuf_append(dcb->dcb_readqueue, read_buffer); dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, read_buffer);
rc = 0; rc = 0;
goto return_rc; goto return_rc;
} }

View File

@ -75,9 +75,10 @@ static void clientReply(
static void errorReply( static void errorReply(
ROUTER *instance, ROUTER *instance,
void *router_session, void *router_session,
char *message, GWBUF *message,
DCB *backend_dcb, DCB *backend_dcb,
int action); error_action_t action,
bool *succp);
static uint8_t getCapabilities (ROUTER* inst, void* router_session); static uint8_t getCapabilities (ROUTER* inst, void* router_session);
@ -275,10 +276,9 @@ int i;
instances = inst; instances = inst;
spinlock_release(&instlock); spinlock_release(&instlock);
spinlock_init(&inst->alock);
inst->active_logs = 0; inst->active_logs = 0;
inst->reconnect_pending = 0; inst->reconnect_pending = 0;
inst->queue = NULL; inst->handling_threads = 0;
inst->residual = NULL; inst->residual = NULL;
inst->slaves = NULL; inst->slaves = NULL;
inst->next = NULL; inst->next = NULL;
@ -582,10 +582,6 @@ struct tm tm;
router_inst->stats.n_heartbeats); router_inst->stats.n_heartbeats);
dcb_printf(dcb, "\tNumber of packets received: %u\n", dcb_printf(dcb, "\tNumber of packets received: %u\n",
router_inst->stats.n_reads); router_inst->stats.n_reads);
dcb_printf(dcb, "\tNumber of packets queued: %u\n",
router_inst->stats.n_queueadd);
dcb_printf(dcb, "\tCurrent length of incoming queue: %d\n",
gwbuf_length(router_inst->queue));
dcb_printf(dcb, "\tNumber of residual data packets: %u\n", dcb_printf(dcb, "\tNumber of residual data packets: %u\n",
router_inst->stats.n_residuals); router_inst->stats.n_residuals);
dcb_printf(dcb, "\tAverage events per packet %.1f\n", dcb_printf(dcb, "\tAverage events per packet %.1f\n",
@ -611,8 +607,6 @@ struct tm tm;
spinlock_stats(&instlock, spin_reporter, dcb); spinlock_stats(&instlock, spin_reporter, dcb);
dcb_printf(dcb, "\tSpinlock statistics (instance lock):\n"); dcb_printf(dcb, "\tSpinlock statistics (instance lock):\n");
spinlock_stats(&router_inst->lock, spin_reporter, dcb); spinlock_stats(&router_inst->lock, spin_reporter, dcb);
dcb_printf(dcb, "\tSpinlock statistics (active log lock):\n");
spinlock_stats(&router_inst->alock, spin_reporter, dcb);
#endif #endif
if (router_inst->slaves) if (router_inst->slaves)
@ -710,18 +704,15 @@ ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
* @param message The error message to reply * @param message The error message to reply
* @param backend_dcb The backend DCB * @param backend_dcb The backend DCB
* @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION * @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION
* @param succp Result of action
* *
*/ */
static void static void
errorReply( errorReply(ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_dcb, error_action_t action, bool *succp)
ROUTER *instance,
void *router_session,
char *message,
DCB *backend_dcb,
int action)
{ {
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, "Erorr Reply '%s'", message))); LOGFILE_ERROR, "Erorr Reply '%s'", message)));
*succp = false;
} }
/** to be inline'd */ /** to be inline'd */

View File

@ -145,18 +145,11 @@ GWBUF *ptr;
} }
router->residual = NULL; router->residual = NULL;
/* Discard the queued data */
ptr = router->queue;
while (ptr)
{
ptr = gwbuf_consume(ptr, GWBUF_LENGTH(ptr));
}
router->queue = NULL;
/* Now it is safe to unleash other threads on this router instance */ /* Now it is safe to unleash other threads on this router instance */
spinlock_acquire(&router->alock); spinlock_acquire(&router->lock);
router->reconnect_pending = 0; router->reconnect_pending = 0;
router->active_logs = 0; router->active_logs = 0;
spinlock_release(&router->alock); spinlock_release(&router->lock);
blr_start_master(router); blr_start_master(router);
} }
@ -175,7 +168,7 @@ blr_master_reconnect(ROUTER_INSTANCE *router)
{ {
int do_reconnect = 0; int do_reconnect = 0;
spinlock_acquire(&router->alock); spinlock_acquire(&router->lock);
if (router->active_logs) if (router->active_logs)
{ {
/* Currently processing a response, set a flag /* Currently processing a response, set a flag
@ -190,13 +183,13 @@ int do_reconnect = 0;
router->active_logs = 1; router->active_logs = 1;
do_reconnect = 1; do_reconnect = 1;
} }
spinlock_release(&router->alock); spinlock_release(&router->lock);
if (do_reconnect) if (do_reconnect)
{ {
blr_restart_master(router); blr_restart_master(router);
spinlock_acquire(&router->alock); spinlock_acquire(&router->lock);
router->active_logs = 0; router->active_logs = 0;
spinlock_release(&router->alock); spinlock_release(&router->lock);
} }
} }
@ -214,76 +207,29 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
{ {
char query[128]; char query[128];
/* atomic_add(&router->handling_threads, 1);
* We need to make sure that incoming packets (gwbufs) are ss_dassert(router->handling_threads == 1);
* strictly processed in order and that we do not have packets spinlock_acquire(&router->lock);
* from the same master being processed on multiple threads. router->active_logs = 1;
* To do this we create a queue of the GWBUF structures and have spinlock_release(&router->lock);
* a flag that indicates if this routine is processing a packet
* on another thread. Items will be added to the queue if the
* routine is running in another thread. That thread will read
* the queue before returning.
*
* The action of adding items to the queue is protected by a
* spinlock and a flag that inidicates if the routine running
* in the other thread has reached the point at which it will
* no longer check the queue before returning. In order to
* manipulate the queue or the flag the router spinlock must
* be held.
*/
spinlock_acquire(&router->alock);
if (router->active_logs)
{
/*
* Thread already processing a packet and has not got
* to the point that it will not look at new packets
* added to the queue.
*/
router->stats.n_queueadd++;
router->queue = gwbuf_append(router->queue, buf);
spinlock_release(&router->alock);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE, "Queued data due to active log "
"handling. %s @ %d, queue length %d\n",
router->binlog_name,
router->binlog_position,
gwbuf_length(router->queue))));
return;
}
else
{
router->active_logs = 1;
if (router->queue)
{
GWBUF *tmp;
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Found an unexpected queue item"
" prepending queue of length %d.\n",
gwbuf_length(router->queue))));
tmp = gwbuf_append(router->queue, buf);
buf = tmp;
router->queue = NULL;
}
}
spinlock_release(&router->alock);
if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE) if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE)
{ {
LOGIF(LE, (skygw_log_write( LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.\n", LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.\n",
router->master_state))); router->master_state)));
gwbuf_consume(buf, gwbuf_length(buf)); gwbuf_consume(buf, gwbuf_length(buf));
spinlock_acquire(&router->alock); spinlock_acquire(&router->lock);
if (router->reconnect_pending) if (router->reconnect_pending)
{ {
router->active_logs = 0; router->active_logs = 0;
spinlock_release(&router->alock); spinlock_release(&router->lock);
atomic_add(&router->handling_threads, -1);
blr_restart_master(router); blr_restart_master(router);
return; return;
} }
router->active_logs = 0; router->active_logs = 0;
spinlock_release(&router->alock); spinlock_release(&router->lock);
atomic_add(&router->handling_threads, -1);
return; return;
} }
@ -295,139 +241,125 @@ char query[128];
MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state] MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state]
))); )));
gwbuf_consume(buf, gwbuf_length(buf)); gwbuf_consume(buf, gwbuf_length(buf));
spinlock_acquire(&router->alock); spinlock_acquire(&router->lock);
router->active_logs = 0; router->active_logs = 0;
if (router->reconnect_pending) if (router->reconnect_pending)
{ {
spinlock_release(&router->alock); spinlock_release(&router->lock);
atomic_add(&router->handling_threads, -1);
blr_restart_master(router); blr_restart_master(router);
return; return;
} }
spinlock_release(&router->alock); spinlock_release(&router->lock);
atomic_add(&router->handling_threads, -1);
return; return;
} }
do { switch (router->master_state)
switch (router->master_state) {
{ case BLRM_TIMESTAMP:
case BLRM_TIMESTAMP: // Response to a timestamp message, no need to save this.
// Response to a timestamp message, no need to save this. gwbuf_consume(buf, GWBUF_LENGTH(buf));
gwbuf_consume(buf, GWBUF_LENGTH(buf)); buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'");
buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'"); router->master_state = BLRM_SERVERID;
router->master_state = BLRM_SERVERID; router->master->func.write(router->master, buf);
router->master->func.write(router->master, buf); break;
break; case BLRM_SERVERID:
case BLRM_SERVERID: // Response to fetch of master's server-id
// Response to fetch of master's server-id router->saved_master.server_id = buf;
router->saved_master.server_id = buf; // TODO: Extract the value of server-id and place in router->master_id
// TODO: Extract the value of server-id and place in router->master_id buf = blr_make_query("SET @master_heartbeat_period = 1799999979520");
buf = blr_make_query("SET @master_heartbeat_period = 1799999979520"); router->master_state = BLRM_HBPERIOD;
router->master_state = BLRM_HBPERIOD; router->master->func.write(router->master, buf);
router->master->func.write(router->master, buf); break;
break; case BLRM_HBPERIOD:
case BLRM_HBPERIOD: // Response to set the heartbeat period
// Response to set the heartbeat period router->saved_master.heartbeat = buf;
router->saved_master.heartbeat = buf; buf = blr_make_query("SET @master_binlog_checksum = @@global.binlog_checksum");
buf = blr_make_query("SET @master_binlog_checksum = @@global.binlog_checksum"); router->master_state = BLRM_CHKSUM1;
router->master_state = BLRM_CHKSUM1; router->master->func.write(router->master, buf);
router->master->func.write(router->master, buf); break;
break; case BLRM_CHKSUM1:
case BLRM_CHKSUM1: // Response to set the master binlog checksum
// Response to set the master binlog checksum router->saved_master.chksum1 = buf;
router->saved_master.chksum1 = buf; buf = blr_make_query("SELECT @master_binlog_checksum");
buf = blr_make_query("SELECT @master_binlog_checksum"); router->master_state = BLRM_CHKSUM2;
router->master_state = BLRM_CHKSUM2; router->master->func.write(router->master, buf);
router->master->func.write(router->master, buf); break;
break; case BLRM_CHKSUM2:
case BLRM_CHKSUM2: // Response to the master_binlog_checksum, should be stored
// Response to the master_binlog_checksum, should be stored router->saved_master.chksum2 = buf;
router->saved_master.chksum2 = buf; buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE");
buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE"); router->master_state = BLRM_GTIDMODE;
router->master_state = BLRM_GTIDMODE; router->master->func.write(router->master, buf);
router->master->func.write(router->master, buf); break;
break; case BLRM_GTIDMODE:
case BLRM_GTIDMODE: // Response to the GTID_MODE, should be stored
// Response to the GTID_MODE, should be stored router->saved_master.gtid_mode = buf;
router->saved_master.gtid_mode = buf; buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'");
buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'"); router->master_state = BLRM_MUUID;
router->master_state = BLRM_MUUID; router->master->func.write(router->master, buf);
router->master->func.write(router->master, buf); break;
break; case BLRM_MUUID:
case BLRM_MUUID: // Response to the SERVER_UUID, should be stored
// Response to the SERVER_UUID, should be stored router->saved_master.uuid = buf;
router->saved_master.uuid = buf; sprintf(query, "SET @slave_uuid='%s'", router->uuid);
sprintf(query, "SET @slave_uuid='%s'", router->uuid); buf = blr_make_query(query);
buf = blr_make_query(query); router->master_state = BLRM_SUUID;
router->master_state = BLRM_SUUID; router->master->func.write(router->master, buf);
router->master->func.write(router->master, buf); break;
break; case BLRM_SUUID:
case BLRM_SUUID: // Response to the SET @server_uuid, should be stored
// Response to the SET @server_uuid, should be stored router->saved_master.setslaveuuid = buf;
router->saved_master.setslaveuuid = buf; buf = blr_make_query("SET NAMES latin1");
buf = blr_make_query("SET NAMES latin1"); router->master_state = BLRM_LATIN1;
router->master_state = BLRM_LATIN1; router->master->func.write(router->master, buf);
router->master->func.write(router->master, buf); break;
break; case BLRM_LATIN1:
case BLRM_LATIN1: // Response to the SET NAMES latin1, should be stored
// Response to the SET NAMES latin1, should be stored router->saved_master.setnames = buf;
router->saved_master.setnames = buf; buf = blr_make_query("SET NAMES utf8");
buf = blr_make_query("SET NAMES utf8"); router->master_state = BLRM_UTF8;
router->master_state = BLRM_UTF8; router->master->func.write(router->master, buf);
router->master->func.write(router->master, buf); break;
break; case BLRM_UTF8:
case BLRM_UTF8: // Response to the SET NAMES utf8, should be stored
// Response to the SET NAMES utf8, should be stored router->saved_master.utf8 = buf;
router->saved_master.utf8 = buf; buf = blr_make_query("SELECT 1");
buf = blr_make_query("SELECT 1"); router->master_state = BLRM_SELECT1;
router->master_state = BLRM_SELECT1; router->master->func.write(router->master, buf);
router->master->func.write(router->master, buf); break;
break; case BLRM_SELECT1:
case BLRM_SELECT1: // Response to the SELECT 1, should be stored
// Response to the SELECT 1, should be stored router->saved_master.select1 = buf;
router->saved_master.select1 = buf; buf = blr_make_query("SELECT VERSION();");
buf = blr_make_query("SELECT VERSION();"); router->master_state = BLRM_SELECTVER;
router->master_state = BLRM_SELECTVER; router->master->func.write(router->master, buf);
router->master->func.write(router->master, buf); break;
break; case BLRM_SELECTVER:
case BLRM_SELECTVER: // Response to SELECT VERSION should be stored
// Response to SELECT VERSION should be stored router->saved_master.selectver = buf;
router->saved_master.selectver = buf; buf = blr_make_registration(router);
buf = blr_make_registration(router); router->master_state = BLRM_REGISTER;
router->master_state = BLRM_REGISTER; router->master->func.write(router->master, buf);
router->master->func.write(router->master, buf); break;
break; case BLRM_REGISTER:
case BLRM_REGISTER: // Request a dump of the binlog file
// Request a dump of the binlog file buf = blr_make_binlog_dump(router);
buf = blr_make_binlog_dump(router); router->master_state = BLRM_BINLOGDUMP;
router->master_state = BLRM_BINLOGDUMP; router->master->func.write(router->master, buf);
router->master->func.write(router->master, buf); break;
break; case BLRM_BINLOGDUMP:
case BLRM_BINLOGDUMP: // Main body, we have received a binlog record from the master
// Main body, we have received a binlog record from the master blr_handle_binlog_record(router, buf);
blr_handle_binlog_record(router, buf); break;
break; }
}
/* if (router->reconnect_pending)
* Check for messages queued by other threads. blr_restart_master(router);
*/ spinlock_acquire(&router->lock);
spinlock_acquire(&router->alock); router->active_logs = 0;
if ((buf = router->queue) != NULL) spinlock_release(&router->lock);
{ atomic_add(&router->handling_threads, -1);
router->queue = NULL;
}
else
{
if (router->reconnect_pending)
{
blr_restart_master(router);
}
else
{
router->active_logs = 0;
}
}
spinlock_release(&router->alock);
} while (buf != NULL);
} }
/** /**
@ -548,13 +480,15 @@ encode_value(unsigned char *data, unsigned int value, int len)
static void static void
blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
{ {
uint8_t *msg = NULL, *ptr, *pdata; uint8_t *msg = NULL, *ptr, *pdata;
REP_HEADER hdr; REP_HEADER hdr;
int len, reslen; unsigned int len, reslen;
int no_residual = 1; unsigned int pkt_length;
int preslen = -1; int no_residual = 1;
int prev_length = -1; int preslen = -1;
int n_bufs = -1, pn_bufs = -1; int prev_length = -1;
int n_bufs = -1, pn_bufs = -1;
static REP_HEADER phdr;
/* /*
* Prepend any residual buffer to the buffer chain we have * Prepend any residual buffer to the buffer chain we have
@ -567,7 +501,8 @@ int n_bufs = -1, pn_bufs = -1;
no_residual = 0; no_residual = 0;
} }
while (pkt && gwbuf_length(pkt) > 24) pkt_length = gwbuf_length(pkt);
while (pkt && pkt_length > 24)
{ {
reslen = GWBUF_LENGTH(pkt); reslen = GWBUF_LENGTH(pkt);
pdata = GWBUF_DATA(pkt); pdata = GWBUF_DATA(pkt);
@ -595,7 +530,7 @@ int n_bufs = -1, pn_bufs = -1;
len = extract_field(pdata, 24) + 4; len = extract_field(pdata, 24) + 4;
} }
if (reslen < len && gwbuf_length(pkt) >= len) if (reslen < len && pkt_length >= len)
{ {
/* /*
* The message is contained in more than the current * The message is contained in more than the current
@ -703,6 +638,7 @@ int n_bufs = -1, pn_bufs = -1;
} }
break; break;
} }
phdr = hdr;
if (hdr.ok == 0) if (hdr.ok == 0)
{ {
router->stats.n_binlogs++; router->stats.n_binlogs++;
@ -810,6 +746,7 @@ int n_bufs = -1, pn_bufs = -1;
n = (plen < len ? plen : len); n = (plen < len ? plen : len);
pkt = gwbuf_consume(pkt, n); pkt = gwbuf_consume(pkt, n);
len -= n; len -= n;
pkt_length -= n;
} }
preslen = reslen; preslen = reslen;
pn_bufs = n_bufs; pn_bufs = n_bufs;
@ -822,6 +759,11 @@ int n_bufs = -1, pn_bufs = -1;
if (pkt) if (pkt)
{ {
router->residual = pkt; router->residual = pkt;
ss_dassert(pkt_length != 0);
}
else
{
ss_dassert(pkt_length == 0);
} }
blr_file_flush(router); blr_file_flush(router);
} }
@ -892,7 +834,7 @@ char file[BINLOG_FNAMELEN+1];
memcpy(file, ptr + 8, slen); memcpy(file, ptr + 8, slen);
file[slen] = 0; file[slen] = 0;
#ifdef VEBOSE_ROTATE #ifdef VERBOSE_ROTATE
printf("binlog rotate: "); printf("binlog rotate: ");
while (len--) while (len--)
printf("0x%02x ", *ptr++); printf("0x%02x ", *ptr++);