From 13e95ffc536d39b4bef3c4594fb8439ec208f14c Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Sat, 7 Jun 2014 21:01:11 +0100 Subject: [PATCH] Improved diagnostics Added master reconnect on failure Added EPOLLRDHUP events --- server/core/config.c | 21 ++++- server/core/dcb.c | 1 + server/core/poll.c | 21 ++++- server/modules/include/blr.h | 7 ++ server/modules/protocol/mysql_backend.c | 25 ++++++ server/modules/routing/binlog/blr.c | 43 +++++++++- server/modules/routing/binlog/blr_master.c | 94 ++++++++++++++++------ 7 files changed, 180 insertions(+), 32 deletions(-) diff --git a/server/core/config.c b/server/core/config.c index dad4c4022..ee6a1d3bb 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -602,6 +602,15 @@ config_threadcount() return gateway.n_threads; } +static struct { + char *logname; + logfile_id_t logfile; +} lognames[] = { + { "log_messages", LOGFILE_MESSAGE }, + { "log_trace", LOGFILE_TRACE }, + { "log_debug", LOGFILE_DEBUG }, + { NULL, 0 } +}; /** * Configuration handler for items in the global [MaxScale] section * @@ -612,10 +621,20 @@ config_threadcount() static int handle_global_item(const char *name, const char *value) { +int i; if (strcmp(name, "threads") == 0) { gateway.n_threads = atoi(value); } else { - return 0; + for (i = 0; lognames[i].logname; i++) + { + if (strcasecmp(name, lognames[i].logname) == 0) + { + if (atoi(value)) + skygw_log_enable(lognames[i].logfile); + else + skygw_log_disable(lognames[i].logfile); + } + } } return 1; } diff --git a/server/core/dcb.c b/server/core/dcb.c index 52a9bbfc9..15f4ea637 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -1131,6 +1131,7 @@ dprintDCB(DCB *pdcb, DCB *dcb) dcb_printf(pdcb, "\tConnected to: %s\n", dcb->remote); dcb_printf(pdcb, "\tOwning Session: %d\n", dcb->session); dcb_printf(pdcb, "\tQueued write data: %d\n", gwbuf_length(dcb->writeq)); + dcb_printf(pdcb, "\tDelayed write data: %d\n", gwbuf_length(dcb->delayq)); dcb_printf(pdcb, "\tStatistics:\n"); dcb_printf(pdcb, "\t\tNo. of Reads: %d\n", dcb->stats.n_reads); dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes); diff --git a/server/core/poll.c b/server/core/poll.c index f1c65ebea..365b05e0e 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -99,7 +99,7 @@ poll_add_dcb(DCB *dcb) CHK_DCB(dcb); - ev.events = EPOLLIN | EPOLLOUT | EPOLLET; + ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET; ev.data.ptr = dcb; /*< @@ -474,6 +474,25 @@ poll_waitevents(void *arg) atomic_add(&pollStats.n_hup, 1); dcb->func.hangup(dcb); } + + if (ev & EPOLLRDHUP) + { + int eno = 0; + eno = gw_getsockerrno(dcb->fd); + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "EPOLLRDHUP on dcb %p, fd %d. " + "Errno %d, %s.", + pthread_self(), + dcb, + dcb->fd, + eno, + strerror(eno)))); + atomic_add(&pollStats.n_hup, 1); + dcb->func.hangup(dcb); + } } /*< for */ no_op = FALSE; } diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 865f57f3d..b00a63ad0 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -110,6 +110,11 @@ typedef struct { uint64_t n_cachehits; /*< Number of hits on the binlog cache */ uint64_t n_cachemisses; /*< Number of misses on the binlog cache */ unsigned int n_registered; /*< Number of registered slaves */ + int n_masterstarts; /*< Numebr of times connection restarted */ + int n_queueadd; /*< Numebr of times incoming data was added to processign queue */ + int n_residuals; /*< Number of times residual data was buffered */ + unsigned int n_heartbeats; /*< Number of heartbeat messages */ + time_t lastReply; uint64_t n_fakeevents; /*< Fake events not written to disk */ uint64_t events[0x24]; /*< Per event counters */ } ROUTER_STATS; @@ -175,8 +180,10 @@ typedef struct router_instance { char *password; /*< Password to use with master */ char *fileroot; /*< Root of binlog filename */ DCB *master; /*< DCB for master connection */ + DCB *client; /*< DCB for dummy client */ SESSION *session; /*< Fake session for master connection */ unsigned int master_state; /*< State of the master FSM */ + uint8_t lastEventReceived; GWBUF *residual; /*< Any residual binlog event */ MASTER_RESPONSES saved_master; /*< Saved master responses */ char binlog_name[BINLOG_FNAMELEN+1]; diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index f1774485d..1653de5f0 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -792,6 +792,31 @@ return_fd: static int gw_backend_hangup(DCB *dcb) { + SESSION *session; + void *rsession; + ROUTER_OBJECT *router; + ROUTER *router_instance; + int rc = 0; + + session = dcb->session; + + if (session->state == SESSION_STATE_ROUTER_READY) + { + router = session->service->router; + router_instance = session->service->router_instance; + rsession = session->router_session; + /*< + * rsession should never be NULL here. + */ + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [gw_backend_hangup] " + "Call closeSession for backend " + "session.", + pthread_self()))); + + router->closeSession(router_instance, rsession); + } /*< vraa : errorHandle */ return 1; } diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index a6f2bb921..006b30861 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -47,6 +48,7 @@ #include #include #include +#include #include #include @@ -436,8 +438,13 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session; * * TODO: Handle closure of master session */ - LOGIF(LD, (skygw_log_write_flush( + LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Binlog router close session with master"))); + router->master_state = BLRM_UNCONNECTED; + dcb_close(router->master); + dcb_free(router->master); + dcb_free(router->client); + blr_start_master(router); return; } CHK_CLIENT_RSES(slave); @@ -504,8 +511,10 @@ static void diagnostics(ROUTER *router, DCB *dcb) { ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)router; -ROUTER_SLAVE *session; -int i = 0; +ROUTER_SLAVE *session; +int i = 0; +char buf[40]; +struct tm tm; spinlock_acquire(&router_inst->lock); session = router_inst->slaves; @@ -515,7 +524,17 @@ int i = 0; session = session->next; } spinlock_release(&router_inst->lock); + + dcb_printf(dcb, "\tMaster connection DCB: %p\n", + router_inst->master); + dcb_printf(dcb, "\tMaster connection state: %s\n", + blrm_states[router_inst->master_state]); + + localtime_r(&router_inst->stats.lastReply, &tm); + asctime_r(&tm, buf); + dcb_printf(dcb, "\tNumber of master connects: %d\n", + router_inst->stats.n_masterstarts); dcb_printf(dcb, "\tCurrent binlog file: %s\n", router_inst->binlog_name); dcb_printf(dcb, "\tCurrent binlog position: %u\n", @@ -524,7 +543,7 @@ int i = 0; router_inst->stats.n_slaves); dcb_printf(dcb, "\tNumber of binlog events received: %u\n", router_inst->stats.n_binlogs); - dcb_printf(dcb, "\tNumber of fake binlog events: %u\n", + dcb_printf(dcb, "\tNumber of fake binlog events: %u\n", router_inst->stats.n_fakeevents); dcb_printf(dcb, "\tNumber of binlog events in error: %u\n", router_inst->stats.n_binlog_errors); @@ -534,10 +553,23 @@ int i = 0; router_inst->stats.n_cachehits); dcb_printf(dcb, "\tNumber of binlog cache misses: %u\n", router_inst->stats.n_cachemisses); + dcb_printf(dcb, "\tNumber of heartbeat events: %u\n", + router_inst->stats.n_heartbeats); dcb_printf(dcb, "\tNumber of packets received: %u\n", router_inst->stats.n_reads); + dcb_printf(dcb, "\tNumber of packets queued: %u\n", + router_inst->stats.n_queueadd); + dcb_printf(dcb, "\tCurrent length of incoming queue: %d\n", + gwbuf_length(router_inst->queue)); + dcb_printf(dcb, "\tNumber of residual data packets: %u\n", + router_inst->stats.n_residuals); dcb_printf(dcb, "\tAverage events per packet %.1f\n", (double)router_inst->stats.n_binlogs / router_inst->stats.n_reads); + dcb_printf(dcb, "\tLast event from master at: %s\n", buf); + dcb_printf(dcb, "\tLast event from master: 0x%x\n", + router_inst->lastEventReceived); + if (router_inst->active_logs) + dcb_printf(dcb, "\tRouter processing binlog records\n"); dcb_printf(dcb, "\tEvents received:\n"); for (i = 0; i < 0x24; i++) { @@ -596,6 +628,7 @@ ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; atomic_add(&router->stats.n_reads, 1); blr_master_response(router, queue); + router->stats.lastReply = time(0); } /** @@ -619,6 +652,8 @@ errorReply( DCB *backend_dcb, int action) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, "Erorr Reply '%s'", message))); } /** to be inline'd */ diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index 6dee211d8..6f3afd8c7 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -48,6 +48,9 @@ #include #include +#include +#include + #include #include #include @@ -68,6 +71,8 @@ static void *CreateMySQLAuthData(char *username, char *password, char *database) static void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr); static uint32_t extract_field(uint8_t *src, int bits); +static int keepalive = 1; + /** * blr_start_master - controls the connection of the binlog router to the * master MySQL server and triggers the slave registration process for @@ -81,15 +86,37 @@ blr_start_master(ROUTER_INSTANCE *router) DCB *client; GWBUF *buf; - client = dcb_alloc(DCB_ROLE_INTERNAL); + if ((client = dcb_alloc(DCB_ROLE_INTERNAL)) == NULL) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "Binlog router: failed to create DCB for dummy client\n"))); + return; + } + router->client = client; client->data = CreateMySQLAuthData(router->user, router->password, ""); - router->session = session_alloc(router->service, client); + if ((router->session = session_alloc(router->service, client)) == NULL) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "Binlog router: failed to create session for connection to master\n"))); + return; + } client->session = router->session; - router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL); + if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "Binlog router: failed to connect to master\n"))); + return; + } + +if (setsockopt(router->master->fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive , sizeof(keepalive ))) +perror("setsockopt"); + router->master_state = BLRM_AUTHENTICATED; buf = blr_make_query("SELECT UNIX_TIMESTAMP()"); router->master->func.write(router->master, buf); router->master_state = BLRM_TIMESTAMP; + + router->stats.n_masterstarts++; } /** @@ -131,6 +158,7 @@ char query[128]; * to the point that it will not look at new packets * added to the queue. */ + router->stats.n_queueadd++; router->queue = gwbuf_append(router->queue, buf); spinlock_release(&router->lock); return; @@ -305,16 +333,17 @@ int len = 18; if ((buf = gwbuf_alloc(len + 4)) == NULL) return NULL; data = GWBUF_DATA(buf); - encode_value(&data[0], len, 24); // Payload length - data[3] = 0; // Sequence ID - data[4] = COM_REGISTER_SLAVE; // Command - encode_value(&data[5], router->serverid, 32); // Slave Server ID - data[9] = 0; // Slave hostname length - data[10] = 0; // Slave username length - data[11] = 0; // Slave password length - encode_value(&data[12], router->service->ports->port, 16); // Slave master port - encode_value(&data[14], 0, 32); // Replication rank - encode_value(&data[18], router->masterid, 32); // Master server-id + encode_value(&data[0], len, 24); // Payload length + data[3] = 0; // Sequence ID + data[4] = COM_REGISTER_SLAVE; // Command + encode_value(&data[5], router->serverid, 32); // Slave Server ID + data[9] = 0; // Slave hostname length + data[10] = 0; // Slave username length + data[11] = 0; // Slave password length + encode_value(&data[12], + router->service->ports->port, 16); // Slave master port + encode_value(&data[14], 0, 32); // Replication rank + encode_value(&data[18], router->masterid, 32); // Master server-id return buf; } @@ -338,14 +367,16 @@ int len = 0x1b; return NULL; data = GWBUF_DATA(buf); - encode_value(&data[0], len,24); // Payload length - data[3] = 0; // Sequence ID - data[4] = COM_BINLOG_DUMP; // Command - encode_value(&data[5], router->binlog_position, 32); // binlog position - encode_value(&data[9], 0, 16); // Flags - encode_value(&data[11], router->serverid, 32); // Server-id of MaxScale + encode_value(&data[0], len,24); // Payload length + data[3] = 0; // Sequence ID + data[4] = COM_BINLOG_DUMP; // Command + encode_value(&data[5], + router->binlog_position, 32); // binlog position + encode_value(&data[9], 0, 16); // Flags + encode_value(&data[11], + router->serverid, 32); // Server-id of MaxScale strncpy((char *)&data[15], router->binlog_name, - BINLOG_FNAMELEN); // binlog filename + BINLOG_FNAMELEN); // binlog filename return buf; } @@ -469,9 +500,14 @@ int no_residual = 1; /* * The message is not fully contained in the current * and we do not have the complete message in the - * buffer chain. Therefore we must stop processing until - * we receive the next buffer. + * buffer chain. Therefore we must stop processing + * until we receive the next buffer. */ + router->stats.n_residuals++; + LOGIF(LD,(skygw_log_write( + LOGFILE_DEBUG, + "Residual data left after %d records.\n", + router->stats.n_binlogs))); break; } else @@ -486,13 +522,19 @@ int no_residual = 1; if (hdr.event_size != len - 5) { - printf("Packet length is %d, but event size is %d\n", - len, hdr.event_size); - abort(); + LOGIF(LE,(skygw_log_write( + LOGFILE_ERROR, + "Packet length is %d, but event size is %d, " + "binlog file %s position %d", + len, hdr.event_size, + router->binlog_name, + router->binlog_position))); + break; } if (hdr.ok == 0) { router->stats.n_binlogs++; + router->lastEventReceived = hdr.event_type; // #define SHOW_EVENTS #ifdef SHOW_EVENTS @@ -526,7 +568,7 @@ int no_residual = 1; #ifdef SHOW_EVENTS printf("Replication heartbeat\n"); #endif - ; + router->stats.n_heartbeats++; } else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F) {