Improved diagnostics
Added master reconnect on failure Added EPOLLRDHUP events
This commit is contained in:
@ -48,6 +48,9 @@
|
||||
#include <dcb.h>
|
||||
#include <spinlock.h>
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
|
||||
#include <skygw_types.h>
|
||||
#include <skygw_utils.h>
|
||||
#include <log_manager.h>
|
||||
@ -68,6 +71,8 @@ static void *CreateMySQLAuthData(char *username, char *password, char *database)
|
||||
static void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
|
||||
static uint32_t extract_field(uint8_t *src, int bits);
|
||||
|
||||
static int keepalive = 1;
|
||||
|
||||
/**
|
||||
* blr_start_master - controls the connection of the binlog router to the
|
||||
* master MySQL server and triggers the slave registration process for
|
||||
@ -81,15 +86,37 @@ blr_start_master(ROUTER_INSTANCE *router)
|
||||
DCB *client;
|
||||
GWBUF *buf;
|
||||
|
||||
client = dcb_alloc(DCB_ROLE_INTERNAL);
|
||||
if ((client = dcb_alloc(DCB_ROLE_INTERNAL)) == NULL)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||
"Binlog router: failed to create DCB for dummy client\n")));
|
||||
return;
|
||||
}
|
||||
router->client = client;
|
||||
client->data = CreateMySQLAuthData(router->user, router->password, "");
|
||||
router->session = session_alloc(router->service, client);
|
||||
if ((router->session = session_alloc(router->service, client)) == NULL)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||
"Binlog router: failed to create session for connection to master\n")));
|
||||
return;
|
||||
}
|
||||
client->session = router->session;
|
||||
router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL);
|
||||
if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||
"Binlog router: failed to connect to master\n")));
|
||||
return;
|
||||
}
|
||||
|
||||
if (setsockopt(router->master->fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive , sizeof(keepalive )))
|
||||
perror("setsockopt");
|
||||
|
||||
router->master_state = BLRM_AUTHENTICATED;
|
||||
buf = blr_make_query("SELECT UNIX_TIMESTAMP()");
|
||||
router->master->func.write(router->master, buf);
|
||||
router->master_state = BLRM_TIMESTAMP;
|
||||
|
||||
router->stats.n_masterstarts++;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -131,6 +158,7 @@ char query[128];
|
||||
* to the point that it will not look at new packets
|
||||
* added to the queue.
|
||||
*/
|
||||
router->stats.n_queueadd++;
|
||||
router->queue = gwbuf_append(router->queue, buf);
|
||||
spinlock_release(&router->lock);
|
||||
return;
|
||||
@ -305,16 +333,17 @@ int len = 18;
|
||||
if ((buf = gwbuf_alloc(len + 4)) == NULL)
|
||||
return NULL;
|
||||
data = GWBUF_DATA(buf);
|
||||
encode_value(&data[0], len, 24); // Payload length
|
||||
data[3] = 0; // Sequence ID
|
||||
data[4] = COM_REGISTER_SLAVE; // Command
|
||||
encode_value(&data[5], router->serverid, 32); // Slave Server ID
|
||||
data[9] = 0; // Slave hostname length
|
||||
data[10] = 0; // Slave username length
|
||||
data[11] = 0; // Slave password length
|
||||
encode_value(&data[12], router->service->ports->port, 16); // Slave master port
|
||||
encode_value(&data[14], 0, 32); // Replication rank
|
||||
encode_value(&data[18], router->masterid, 32); // Master server-id
|
||||
encode_value(&data[0], len, 24); // Payload length
|
||||
data[3] = 0; // Sequence ID
|
||||
data[4] = COM_REGISTER_SLAVE; // Command
|
||||
encode_value(&data[5], router->serverid, 32); // Slave Server ID
|
||||
data[9] = 0; // Slave hostname length
|
||||
data[10] = 0; // Slave username length
|
||||
data[11] = 0; // Slave password length
|
||||
encode_value(&data[12],
|
||||
router->service->ports->port, 16); // Slave master port
|
||||
encode_value(&data[14], 0, 32); // Replication rank
|
||||
encode_value(&data[18], router->masterid, 32); // Master server-id
|
||||
|
||||
return buf;
|
||||
}
|
||||
@ -338,14 +367,16 @@ int len = 0x1b;
|
||||
return NULL;
|
||||
data = GWBUF_DATA(buf);
|
||||
|
||||
encode_value(&data[0], len,24); // Payload length
|
||||
data[3] = 0; // Sequence ID
|
||||
data[4] = COM_BINLOG_DUMP; // Command
|
||||
encode_value(&data[5], router->binlog_position, 32); // binlog position
|
||||
encode_value(&data[9], 0, 16); // Flags
|
||||
encode_value(&data[11], router->serverid, 32); // Server-id of MaxScale
|
||||
encode_value(&data[0], len,24); // Payload length
|
||||
data[3] = 0; // Sequence ID
|
||||
data[4] = COM_BINLOG_DUMP; // Command
|
||||
encode_value(&data[5],
|
||||
router->binlog_position, 32); // binlog position
|
||||
encode_value(&data[9], 0, 16); // Flags
|
||||
encode_value(&data[11],
|
||||
router->serverid, 32); // Server-id of MaxScale
|
||||
strncpy((char *)&data[15], router->binlog_name,
|
||||
BINLOG_FNAMELEN); // binlog filename
|
||||
BINLOG_FNAMELEN); // binlog filename
|
||||
return buf;
|
||||
}
|
||||
|
||||
@ -469,9 +500,14 @@ int no_residual = 1;
|
||||
/*
|
||||
* The message is not fully contained in the current
|
||||
* and we do not have the complete message in the
|
||||
* buffer chain. Therefore we must stop processing until
|
||||
* we receive the next buffer.
|
||||
* buffer chain. Therefore we must stop processing
|
||||
* until we receive the next buffer.
|
||||
*/
|
||||
router->stats.n_residuals++;
|
||||
LOGIF(LD,(skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"Residual data left after %d records.\n",
|
||||
router->stats.n_binlogs)));
|
||||
break;
|
||||
}
|
||||
else
|
||||
@ -486,13 +522,19 @@ int no_residual = 1;
|
||||
|
||||
if (hdr.event_size != len - 5)
|
||||
{
|
||||
printf("Packet length is %d, but event size is %d\n",
|
||||
len, hdr.event_size);
|
||||
abort();
|
||||
LOGIF(LE,(skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"Packet length is %d, but event size is %d, "
|
||||
"binlog file %s position %d",
|
||||
len, hdr.event_size,
|
||||
router->binlog_name,
|
||||
router->binlog_position)));
|
||||
break;
|
||||
}
|
||||
if (hdr.ok == 0)
|
||||
{
|
||||
router->stats.n_binlogs++;
|
||||
router->lastEventReceived = hdr.event_type;
|
||||
|
||||
// #define SHOW_EVENTS
|
||||
#ifdef SHOW_EVENTS
|
||||
@ -526,7 +568,7 @@ int no_residual = 1;
|
||||
#ifdef SHOW_EVENTS
|
||||
printf("Replication heartbeat\n");
|
||||
#endif
|
||||
;
|
||||
router->stats.n_heartbeats++;
|
||||
}
|
||||
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
|
||||
{
|
||||
|
Reference in New Issue
Block a user