diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 9aaf450af..f493ec715 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -126,7 +126,6 @@ typedef struct { int n_registered; /*< Number of registered slaves */ int n_masterstarts; /*< Number of times connection restarted */ int n_delayedreconnects; - int n_queueadd; /*< Number of times incoming data was added to processign queue */ int n_residuals; /*< Number of times residual data was buffered */ int n_heartbeats; /*< Number of heartbeat messages */ time_t lastReply; @@ -216,10 +215,9 @@ typedef struct router_instance { unsigned int high_water; /*< High water mark for client DCB */ BLCACHE *cache[2]; ROUTER_STATS stats; /*< Statistics for this router */ - SPINLOCK alock; int active_logs; int reconnect_pending; - GWBUF *queue; + int handling_threads; struct router_instance *next; } ROUTER_INSTANCE; diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 5d8088d4e..44b92b1b9 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -497,7 +497,7 @@ static int gw_read_backend_event(DCB *dcb) { { if (nbytes_read < 5) { - gwbuf_append(dcb->dcb_readqueue, read_buffer); + dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, read_buffer); rc = 0; goto return_rc; } diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index 94bcbc6cd..dec20f8b4 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -75,9 +75,10 @@ static void clientReply( static void errorReply( ROUTER *instance, void *router_session, - char *message, + GWBUF *message, DCB *backend_dcb, - int action); + error_action_t action, + bool *succp); static uint8_t getCapabilities (ROUTER* inst, void* router_session); @@ -275,10 +276,9 @@ int i; instances = inst; spinlock_release(&instlock); - spinlock_init(&inst->alock); inst->active_logs = 0; inst->reconnect_pending = 0; - inst->queue = NULL; + inst->handling_threads = 0; inst->residual = NULL; inst->slaves = NULL; inst->next = NULL; @@ -582,10 +582,6 @@ struct tm tm; router_inst->stats.n_heartbeats); dcb_printf(dcb, "\tNumber of packets received: %u\n", router_inst->stats.n_reads); - dcb_printf(dcb, "\tNumber of packets queued: %u\n", - router_inst->stats.n_queueadd); - dcb_printf(dcb, "\tCurrent length of incoming queue: %d\n", - gwbuf_length(router_inst->queue)); dcb_printf(dcb, "\tNumber of residual data packets: %u\n", router_inst->stats.n_residuals); dcb_printf(dcb, "\tAverage events per packet %.1f\n", @@ -611,8 +607,6 @@ struct tm tm; spinlock_stats(&instlock, spin_reporter, dcb); dcb_printf(dcb, "\tSpinlock statistics (instance lock):\n"); spinlock_stats(&router_inst->lock, spin_reporter, dcb); - dcb_printf(dcb, "\tSpinlock statistics (active log lock):\n"); - spinlock_stats(&router_inst->alock, spin_reporter, dcb); #endif if (router_inst->slaves) @@ -710,18 +704,15 @@ ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; * @param message The error message to reply * @param backend_dcb The backend DCB * @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION + * @param succp Result of action * */ static void -errorReply( - ROUTER *instance, - void *router_session, - char *message, - DCB *backend_dcb, - int action) +errorReply(ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_dcb, error_action_t action, bool *succp) { LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Erorr Reply '%s'", message))); + *succp = false; } /** to be inline'd */ diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index d9bc40f48..412276e48 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -145,18 +145,11 @@ GWBUF *ptr; } router->residual = NULL; - /* Discard the queued data */ - ptr = router->queue; - while (ptr) - { - ptr = gwbuf_consume(ptr, GWBUF_LENGTH(ptr)); - } - router->queue = NULL; /* Now it is safe to unleash other threads on this router instance */ - spinlock_acquire(&router->alock); + spinlock_acquire(&router->lock); router->reconnect_pending = 0; router->active_logs = 0; - spinlock_release(&router->alock); + spinlock_release(&router->lock); blr_start_master(router); } @@ -175,7 +168,7 @@ blr_master_reconnect(ROUTER_INSTANCE *router) { int do_reconnect = 0; - spinlock_acquire(&router->alock); + spinlock_acquire(&router->lock); if (router->active_logs) { /* Currently processing a response, set a flag @@ -190,13 +183,13 @@ int do_reconnect = 0; router->active_logs = 1; do_reconnect = 1; } - spinlock_release(&router->alock); + spinlock_release(&router->lock); if (do_reconnect) { blr_restart_master(router); - spinlock_acquire(&router->alock); + spinlock_acquire(&router->lock); router->active_logs = 0; - spinlock_release(&router->alock); + spinlock_release(&router->lock); } } @@ -214,76 +207,29 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf) { char query[128]; - /* - * We need to make sure that incoming packets (gwbufs) are - * strictly processed in order and that we do not have packets - * from the same master being processed on multiple threads. - * To do this we create a queue of the GWBUF structures and have - * a flag that indicates if this routine is processing a packet - * on another thread. Items will be added to the queue if the - * routine is running in another thread. That thread will read - * the queue before returning. - * - * The action of adding items to the queue is protected by a - * spinlock and a flag that inidicates if the routine running - * in the other thread has reached the point at which it will - * no longer check the queue before returning. In order to - * manipulate the queue or the flag the router spinlock must - * be held. - */ - spinlock_acquire(&router->alock); - if (router->active_logs) - { - /* - * Thread already processing a packet and has not got - * to the point that it will not look at new packets - * added to the queue. - */ - router->stats.n_queueadd++; - router->queue = gwbuf_append(router->queue, buf); - spinlock_release(&router->alock); - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, "Queued data due to active log " - "handling. %s @ %d, queue length %d\n", - router->binlog_name, - router->binlog_position, - gwbuf_length(router->queue)))); - return; - } - else - { - router->active_logs = 1; - if (router->queue) - { - GWBUF *tmp; - - LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, "Found an unexpected queue item" - " prepending queue of length %d.\n", - gwbuf_length(router->queue)))); - tmp = gwbuf_append(router->queue, buf); - buf = tmp; - router->queue = NULL; - } - } - spinlock_release(&router->alock); - + atomic_add(&router->handling_threads, 1); + ss_dassert(router->handling_threads == 1); + spinlock_acquire(&router->lock); + router->active_logs = 1; + spinlock_release(&router->lock); if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE) { LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.\n", router->master_state))); gwbuf_consume(buf, gwbuf_length(buf)); - spinlock_acquire(&router->alock); + spinlock_acquire(&router->lock); if (router->reconnect_pending) { router->active_logs = 0; - spinlock_release(&router->alock); + spinlock_release(&router->lock); + atomic_add(&router->handling_threads, -1); blr_restart_master(router); return; } router->active_logs = 0; - spinlock_release(&router->alock); + spinlock_release(&router->lock); + atomic_add(&router->handling_threads, -1); return; } @@ -295,139 +241,125 @@ char query[128]; MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state] ))); gwbuf_consume(buf, gwbuf_length(buf)); - spinlock_acquire(&router->alock); + spinlock_acquire(&router->lock); router->active_logs = 0; if (router->reconnect_pending) { - spinlock_release(&router->alock); + spinlock_release(&router->lock); + atomic_add(&router->handling_threads, -1); blr_restart_master(router); return; } - spinlock_release(&router->alock); + spinlock_release(&router->lock); + atomic_add(&router->handling_threads, -1); return; } - do { - switch (router->master_state) - { - case BLRM_TIMESTAMP: - // Response to a timestamp message, no need to save this. - gwbuf_consume(buf, GWBUF_LENGTH(buf)); - buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'"); - router->master_state = BLRM_SERVERID; - router->master->func.write(router->master, buf); - break; - case BLRM_SERVERID: - // Response to fetch of master's server-id - router->saved_master.server_id = buf; - // TODO: Extract the value of server-id and place in router->master_id - buf = blr_make_query("SET @master_heartbeat_period = 1799999979520"); - router->master_state = BLRM_HBPERIOD; - router->master->func.write(router->master, buf); - break; - case BLRM_HBPERIOD: - // Response to set the heartbeat period - router->saved_master.heartbeat = buf; - buf = blr_make_query("SET @master_binlog_checksum = @@global.binlog_checksum"); - router->master_state = BLRM_CHKSUM1; - router->master->func.write(router->master, buf); - break; - case BLRM_CHKSUM1: - // Response to set the master binlog checksum - router->saved_master.chksum1 = buf; - buf = blr_make_query("SELECT @master_binlog_checksum"); - router->master_state = BLRM_CHKSUM2; - router->master->func.write(router->master, buf); - break; - case BLRM_CHKSUM2: - // Response to the master_binlog_checksum, should be stored - router->saved_master.chksum2 = buf; - buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE"); - router->master_state = BLRM_GTIDMODE; - router->master->func.write(router->master, buf); - break; - case BLRM_GTIDMODE: - // Response to the GTID_MODE, should be stored - router->saved_master.gtid_mode = buf; - buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'"); - router->master_state = BLRM_MUUID; - router->master->func.write(router->master, buf); - break; - case BLRM_MUUID: - // Response to the SERVER_UUID, should be stored - router->saved_master.uuid = buf; - sprintf(query, "SET @slave_uuid='%s'", router->uuid); - buf = blr_make_query(query); - router->master_state = BLRM_SUUID; - router->master->func.write(router->master, buf); - break; - case BLRM_SUUID: - // Response to the SET @server_uuid, should be stored - router->saved_master.setslaveuuid = buf; - buf = blr_make_query("SET NAMES latin1"); - router->master_state = BLRM_LATIN1; - router->master->func.write(router->master, buf); - break; - case BLRM_LATIN1: - // Response to the SET NAMES latin1, should be stored - router->saved_master.setnames = buf; - buf = blr_make_query("SET NAMES utf8"); - router->master_state = BLRM_UTF8; - router->master->func.write(router->master, buf); - break; - case BLRM_UTF8: - // Response to the SET NAMES utf8, should be stored - router->saved_master.utf8 = buf; - buf = blr_make_query("SELECT 1"); - router->master_state = BLRM_SELECT1; - router->master->func.write(router->master, buf); - break; - case BLRM_SELECT1: - // Response to the SELECT 1, should be stored - router->saved_master.select1 = buf; - buf = blr_make_query("SELECT VERSION();"); - router->master_state = BLRM_SELECTVER; - router->master->func.write(router->master, buf); - break; - case BLRM_SELECTVER: - // Response to SELECT VERSION should be stored - router->saved_master.selectver = buf; - buf = blr_make_registration(router); - router->master_state = BLRM_REGISTER; - router->master->func.write(router->master, buf); - break; - case BLRM_REGISTER: - // Request a dump of the binlog file - buf = blr_make_binlog_dump(router); - router->master_state = BLRM_BINLOGDUMP; - router->master->func.write(router->master, buf); - break; - case BLRM_BINLOGDUMP: - // Main body, we have received a binlog record from the master - blr_handle_binlog_record(router, buf); - break; - } + switch (router->master_state) + { + case BLRM_TIMESTAMP: + // Response to a timestamp message, no need to save this. + gwbuf_consume(buf, GWBUF_LENGTH(buf)); + buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'"); + router->master_state = BLRM_SERVERID; + router->master->func.write(router->master, buf); + break; + case BLRM_SERVERID: + // Response to fetch of master's server-id + router->saved_master.server_id = buf; + // TODO: Extract the value of server-id and place in router->master_id + buf = blr_make_query("SET @master_heartbeat_period = 1799999979520"); + router->master_state = BLRM_HBPERIOD; + router->master->func.write(router->master, buf); + break; + case BLRM_HBPERIOD: + // Response to set the heartbeat period + router->saved_master.heartbeat = buf; + buf = blr_make_query("SET @master_binlog_checksum = @@global.binlog_checksum"); + router->master_state = BLRM_CHKSUM1; + router->master->func.write(router->master, buf); + break; + case BLRM_CHKSUM1: + // Response to set the master binlog checksum + router->saved_master.chksum1 = buf; + buf = blr_make_query("SELECT @master_binlog_checksum"); + router->master_state = BLRM_CHKSUM2; + router->master->func.write(router->master, buf); + break; + case BLRM_CHKSUM2: + // Response to the master_binlog_checksum, should be stored + router->saved_master.chksum2 = buf; + buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE"); + router->master_state = BLRM_GTIDMODE; + router->master->func.write(router->master, buf); + break; + case BLRM_GTIDMODE: + // Response to the GTID_MODE, should be stored + router->saved_master.gtid_mode = buf; + buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'"); + router->master_state = BLRM_MUUID; + router->master->func.write(router->master, buf); + break; + case BLRM_MUUID: + // Response to the SERVER_UUID, should be stored + router->saved_master.uuid = buf; + sprintf(query, "SET @slave_uuid='%s'", router->uuid); + buf = blr_make_query(query); + router->master_state = BLRM_SUUID; + router->master->func.write(router->master, buf); + break; + case BLRM_SUUID: + // Response to the SET @server_uuid, should be stored + router->saved_master.setslaveuuid = buf; + buf = blr_make_query("SET NAMES latin1"); + router->master_state = BLRM_LATIN1; + router->master->func.write(router->master, buf); + break; + case BLRM_LATIN1: + // Response to the SET NAMES latin1, should be stored + router->saved_master.setnames = buf; + buf = blr_make_query("SET NAMES utf8"); + router->master_state = BLRM_UTF8; + router->master->func.write(router->master, buf); + break; + case BLRM_UTF8: + // Response to the SET NAMES utf8, should be stored + router->saved_master.utf8 = buf; + buf = blr_make_query("SELECT 1"); + router->master_state = BLRM_SELECT1; + router->master->func.write(router->master, buf); + break; + case BLRM_SELECT1: + // Response to the SELECT 1, should be stored + router->saved_master.select1 = buf; + buf = blr_make_query("SELECT VERSION();"); + router->master_state = BLRM_SELECTVER; + router->master->func.write(router->master, buf); + break; + case BLRM_SELECTVER: + // Response to SELECT VERSION should be stored + router->saved_master.selectver = buf; + buf = blr_make_registration(router); + router->master_state = BLRM_REGISTER; + router->master->func.write(router->master, buf); + break; + case BLRM_REGISTER: + // Request a dump of the binlog file + buf = blr_make_binlog_dump(router); + router->master_state = BLRM_BINLOGDUMP; + router->master->func.write(router->master, buf); + break; + case BLRM_BINLOGDUMP: + // Main body, we have received a binlog record from the master + blr_handle_binlog_record(router, buf); + break; + } - /* - * Check for messages queued by other threads. - */ - spinlock_acquire(&router->alock); - if ((buf = router->queue) != NULL) - { - router->queue = NULL; - } - else - { - if (router->reconnect_pending) - { - blr_restart_master(router); - } - else - { - router->active_logs = 0; - } - } - spinlock_release(&router->alock); - } while (buf != NULL); + if (router->reconnect_pending) + blr_restart_master(router); + spinlock_acquire(&router->lock); + router->active_logs = 0; + spinlock_release(&router->lock); + atomic_add(&router->handling_threads, -1); } /** @@ -548,13 +480,15 @@ encode_value(unsigned char *data, unsigned int value, int len) static void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) { -uint8_t *msg = NULL, *ptr, *pdata; -REP_HEADER hdr; -int len, reslen; -int no_residual = 1; -int preslen = -1; -int prev_length = -1; -int n_bufs = -1, pn_bufs = -1; +uint8_t *msg = NULL, *ptr, *pdata; +REP_HEADER hdr; +unsigned int len, reslen; +unsigned int pkt_length; +int no_residual = 1; +int preslen = -1; +int prev_length = -1; +int n_bufs = -1, pn_bufs = -1; +static REP_HEADER phdr; /* * Prepend any residual buffer to the buffer chain we have @@ -567,7 +501,8 @@ int n_bufs = -1, pn_bufs = -1; no_residual = 0; } - while (pkt && gwbuf_length(pkt) > 24) + pkt_length = gwbuf_length(pkt); + while (pkt && pkt_length > 24) { reslen = GWBUF_LENGTH(pkt); pdata = GWBUF_DATA(pkt); @@ -595,7 +530,7 @@ int n_bufs = -1, pn_bufs = -1; len = extract_field(pdata, 24) + 4; } - if (reslen < len && gwbuf_length(pkt) >= len) + if (reslen < len && pkt_length >= len) { /* * The message is contained in more than the current @@ -703,6 +638,7 @@ int n_bufs = -1, pn_bufs = -1; } break; } + phdr = hdr; if (hdr.ok == 0) { router->stats.n_binlogs++; @@ -810,6 +746,7 @@ int n_bufs = -1, pn_bufs = -1; n = (plen < len ? plen : len); pkt = gwbuf_consume(pkt, n); len -= n; + pkt_length -= n; } preslen = reslen; pn_bufs = n_bufs; @@ -822,6 +759,11 @@ int n_bufs = -1, pn_bufs = -1; if (pkt) { router->residual = pkt; + ss_dassert(pkt_length != 0); + } + else + { + ss_dassert(pkt_length == 0); } blr_file_flush(router); } @@ -892,7 +834,7 @@ char file[BINLOG_FNAMELEN+1]; memcpy(file, ptr + 8, slen); file[slen] = 0; -#ifdef VEBOSE_ROTATE +#ifdef VERBOSE_ROTATE printf("binlog rotate: "); while (len--) printf("0x%02x ", *ptr++);