Merging binlog router into beta refresh
This commit is contained in:
@ -332,7 +332,7 @@ DOWNSTREAM *me;
|
|||||||
if ((filter->obj = load_module(filter->module,
|
if ((filter->obj = load_module(filter->module,
|
||||||
MODULE_FILTER)) == NULL)
|
MODULE_FILTER)) == NULL)
|
||||||
{
|
{
|
||||||
return NULL:
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,6 +188,7 @@ typedef struct {
|
|||||||
int n_slaves; /*< Number slave sessions created */
|
int n_slaves; /*< Number slave sessions created */
|
||||||
int n_reads; /*< Number of record reads */
|
int n_reads; /*< Number of record reads */
|
||||||
uint64_t n_binlogs; /*< Number of binlog records from master */
|
uint64_t n_binlogs; /*< Number of binlog records from master */
|
||||||
|
uint64_t n_binlogs_ses; /*< Number of binlog records from master */
|
||||||
uint64_t n_binlog_errors;/*< Number of binlog records from master */
|
uint64_t n_binlog_errors;/*< Number of binlog records from master */
|
||||||
uint64_t n_rotates; /*< Number of binlog rotate events */
|
uint64_t n_rotates; /*< Number of binlog rotate events */
|
||||||
uint64_t n_cachehits; /*< Number of hits on the binlog cache */
|
uint64_t n_cachehits; /*< Number of hits on the binlog cache */
|
||||||
@ -265,6 +266,7 @@ typedef struct router_instance {
|
|||||||
unsigned int short_burst; /*< Short burst for slave catchup */
|
unsigned int short_burst; /*< Short burst for slave catchup */
|
||||||
unsigned int long_burst; /*< Long burst for slave catchup */
|
unsigned int long_burst; /*< Long burst for slave catchup */
|
||||||
unsigned long burst_size; /*< Maximum size of burst to send */
|
unsigned long burst_size; /*< Maximum size of burst to send */
|
||||||
|
unsigned long heartbeat; /*< Configured heartbeat value */
|
||||||
ROUTER_STATS stats; /*< Statistics for this router */
|
ROUTER_STATS stats; /*< Statistics for this router */
|
||||||
int active_logs;
|
int active_logs;
|
||||||
int reconnect_pending;
|
int reconnect_pending;
|
||||||
|
@ -962,11 +962,20 @@ static int gw_create_backend_connection(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Copy client flags to backend protocol */
|
/** Copy client flags to backend protocol */
|
||||||
protocol->client_capabilities =
|
if (backend_dcb->session->client->protocol)
|
||||||
((MySQLProtocol *)(backend_dcb->session->client->protocol))->client_capabilities;
|
{
|
||||||
/** Copy client charset to backend protocol */
|
protocol->client_capabilities =
|
||||||
protocol->charset =
|
((MySQLProtocol *)(backend_dcb->session->client->protocol))->client_capabilities;
|
||||||
((MySQLProtocol *)(backend_dcb->session->client->protocol))->charset;
|
/** Copy client charset to backend protocol */
|
||||||
|
protocol->charset =
|
||||||
|
((MySQLProtocol *)(backend_dcb->session->client->protocol))->charset;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
protocol->client_capabilities = GW_MYSQL_CAPABILITIES_PROTOCOL_41|
|
||||||
|
GW_MYSQL_CAPABILITIES_CLIENT;
|
||||||
|
protocol->charset = 33;
|
||||||
|
}
|
||||||
|
|
||||||
/*< if succeed, fd > 0, -1 otherwise */
|
/*< if succeed, fd > 0, -1 otherwise */
|
||||||
rv = gw_do_connect_to_backend(server->name, server->port, &fd);
|
rv = gw_do_connect_to_backend(server->name, server->port, &fd);
|
||||||
|
@ -188,6 +188,8 @@ int i;
|
|||||||
inst->long_burst = DEF_LONG_BURST;
|
inst->long_burst = DEF_LONG_BURST;
|
||||||
inst->burst_size = DEF_BURST_SIZE;
|
inst->burst_size = DEF_BURST_SIZE;
|
||||||
inst->retry_backoff = 1;
|
inst->retry_backoff = 1;
|
||||||
|
inst->binlogdir = NULL;
|
||||||
|
inst->heartbeat = 300; // Default is every 5 minutes
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We only support one server behind this router, since the server is
|
* We only support one server behind this router, since the server is
|
||||||
@ -308,6 +310,14 @@ int i;
|
|||||||
inst->burst_size = size;
|
inst->burst_size = size;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
else if (strcmp(options[i], "heartbeat") == 0)
|
||||||
|
{
|
||||||
|
inst->heartbeat = atoi(value);
|
||||||
|
}
|
||||||
|
else if (strcmp(options[i], "binlogdir") == 0)
|
||||||
|
{
|
||||||
|
inst->binlogdir = strdup(value);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write(
|
LOGIF(LE, (skygw_log_write(
|
||||||
@ -643,25 +653,29 @@ struct tm tm;
|
|||||||
min5 /= 5.0;
|
min5 /= 5.0;
|
||||||
|
|
||||||
|
|
||||||
dcb_printf(dcb, "\tMaster connection DCB: %p\n",
|
dcb_printf(dcb, "\tMaster connection DCB: %p\n",
|
||||||
router_inst->master);
|
router_inst->master);
|
||||||
dcb_printf(dcb, "\tMaster connection state: %s\n",
|
dcb_printf(dcb, "\tMaster connection state: %s\n",
|
||||||
blrm_states[router_inst->master_state]);
|
blrm_states[router_inst->master_state]);
|
||||||
|
|
||||||
localtime_r(&router_inst->stats.lastReply, &tm);
|
localtime_r(&router_inst->stats.lastReply, &tm);
|
||||||
asctime_r(&tm, buf);
|
asctime_r(&tm, buf);
|
||||||
|
|
||||||
dcb_printf(dcb, "\tNumber of master connects: %d\n",
|
dcb_printf(dcb, "\tBinlog directory: %s\n",
|
||||||
|
router_inst->binlogdir);
|
||||||
|
dcb_printf(dcb, "\tNumber of master connects: %d\n",
|
||||||
router_inst->stats.n_masterstarts);
|
router_inst->stats.n_masterstarts);
|
||||||
dcb_printf(dcb, "\tNumber of delayed reconnects: %d\n",
|
dcb_printf(dcb, "\tNumber of delayed reconnects: %d\n",
|
||||||
router_inst->stats.n_delayedreconnects);
|
router_inst->stats.n_delayedreconnects);
|
||||||
dcb_printf(dcb, "\tCurrent binlog file: %s\n",
|
dcb_printf(dcb, "\tCurrent binlog file: %s\n",
|
||||||
router_inst->binlog_name);
|
router_inst->binlog_name);
|
||||||
dcb_printf(dcb, "\tCurrent binlog position: %u\n",
|
dcb_printf(dcb, "\tCurrent binlog position: %u\n",
|
||||||
router_inst->binlog_position);
|
router_inst->binlog_position);
|
||||||
dcb_printf(dcb, "\tNumber of slave servers: %u\n",
|
dcb_printf(dcb, "\tNumber of slave servers: %u\n",
|
||||||
router_inst->stats.n_slaves);
|
router_inst->stats.n_slaves);
|
||||||
dcb_printf(dcb, "\tNumber of binlog events received: %u\n",
|
dcb_printf(dcb, "\tNo. of binlog events received this session: %u\n",
|
||||||
|
router_inst->stats.n_binlogs_ses);
|
||||||
|
dcb_printf(dcb, "\tTotal no. of binlog events received: %u\n",
|
||||||
router_inst->stats.n_binlogs);
|
router_inst->stats.n_binlogs);
|
||||||
minno = router_inst->stats.minno - 1;
|
minno = router_inst->stats.minno - 1;
|
||||||
if (minno == -1)
|
if (minno == -1)
|
||||||
@ -670,27 +684,27 @@ struct tm tm;
|
|||||||
dcb_printf(dcb, "\tCurrent 5 10 15 30 Min Avg\n");
|
dcb_printf(dcb, "\tCurrent 5 10 15 30 Min Avg\n");
|
||||||
dcb_printf(dcb, "\t %6d %8.1f %8.1f %8.1f %8.1f\n",
|
dcb_printf(dcb, "\t %6d %8.1f %8.1f %8.1f %8.1f\n",
|
||||||
router_inst->stats.minavgs[minno], min5, min10, min15, min30);
|
router_inst->stats.minavgs[minno], min5, min10, min15, min30);
|
||||||
dcb_printf(dcb, "\tNumber of fake binlog events: %u\n",
|
dcb_printf(dcb, "\tNumber of fake binlog events: %u\n",
|
||||||
router_inst->stats.n_fakeevents);
|
router_inst->stats.n_fakeevents);
|
||||||
dcb_printf(dcb, "\tNumber of artificial binlog events: %u\n",
|
dcb_printf(dcb, "\tNumber of artificial binlog events: %u\n",
|
||||||
router_inst->stats.n_artificial);
|
router_inst->stats.n_artificial);
|
||||||
dcb_printf(dcb, "\tNumber of binlog events in error: %u\n",
|
dcb_printf(dcb, "\tNumber of binlog events in error: %u\n",
|
||||||
router_inst->stats.n_binlog_errors);
|
router_inst->stats.n_binlog_errors);
|
||||||
dcb_printf(dcb, "\tNumber of binlog rotate events: %u\n",
|
dcb_printf(dcb, "\tNumber of binlog rotate events: %u\n",
|
||||||
router_inst->stats.n_rotates);
|
router_inst->stats.n_rotates);
|
||||||
dcb_printf(dcb, "\tNumber of heartbeat events: %u\n",
|
dcb_printf(dcb, "\tNumber of heartbeat events: %u\n",
|
||||||
router_inst->stats.n_heartbeats);
|
router_inst->stats.n_heartbeats);
|
||||||
dcb_printf(dcb, "\tNumber of packets received: %u\n",
|
dcb_printf(dcb, "\tNumber of packets received: %u\n",
|
||||||
router_inst->stats.n_reads);
|
router_inst->stats.n_reads);
|
||||||
dcb_printf(dcb, "\tNumber of residual data packets: %u\n",
|
dcb_printf(dcb, "\tNumber of residual data packets: %u\n",
|
||||||
router_inst->stats.n_residuals);
|
router_inst->stats.n_residuals);
|
||||||
dcb_printf(dcb, "\tAverage events per packet %.1f\n",
|
dcb_printf(dcb, "\tAverage events per packet %.1f\n",
|
||||||
(double)router_inst->stats.n_binlogs / router_inst->stats.n_reads);
|
(double)router_inst->stats.n_binlogs / router_inst->stats.n_reads);
|
||||||
dcb_printf(dcb, "\tLast event from master at: %s",
|
dcb_printf(dcb, "\tLast event from master at: %s",
|
||||||
buf);
|
buf);
|
||||||
dcb_printf(dcb, "\t (%d seconds ago)\n",
|
dcb_printf(dcb, "\t (%d seconds ago)\n",
|
||||||
time(0) - router_inst->stats.lastReply);
|
time(0) - router_inst->stats.lastReply);
|
||||||
dcb_printf(dcb, "\tLast event from master: 0x%x\n",
|
dcb_printf(dcb, "\tLast event from master: 0x%x\n",
|
||||||
router_inst->lastEventReceived);
|
router_inst->lastEventReceived);
|
||||||
if (router_inst->active_logs)
|
if (router_inst->active_logs)
|
||||||
dcb_printf(dcb, "\tRouter processing binlog records\n");
|
dcb_printf(dcb, "\tRouter processing binlog records\n");
|
||||||
@ -699,7 +713,7 @@ struct tm tm;
|
|||||||
dcb_printf(dcb, "\tEvents received:\n");
|
dcb_printf(dcb, "\tEvents received:\n");
|
||||||
for (i = 0; i < 0x24; i++)
|
for (i = 0; i < 0x24; i++)
|
||||||
{
|
{
|
||||||
dcb_printf(dcb, "\t\t%-38s: %u\n", event_names[i], router_inst->stats.events[i]);
|
dcb_printf(dcb, "\t\t%-38s %u\n", event_names[i], router_inst->stats.events[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#if SPINLOCK_PROFILE
|
#if SPINLOCK_PROFILE
|
||||||
|
@ -77,18 +77,27 @@ int root_len, i;
|
|||||||
DIR *dirp;
|
DIR *dirp;
|
||||||
struct dirent *dp;
|
struct dirent *dp;
|
||||||
|
|
||||||
strcpy(path, "/usr/local/skysql/MaxScale");
|
if (router->binlogdir == NULL)
|
||||||
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
|
|
||||||
{
|
{
|
||||||
strcpy(path, ptr);
|
strcpy(path, "/usr/local/skysql/MaxScale");
|
||||||
|
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
|
||||||
|
{
|
||||||
|
strcpy(path, ptr);
|
||||||
|
}
|
||||||
|
strcat(path, "/");
|
||||||
|
strcat(path, router->service->name);
|
||||||
|
|
||||||
|
if (access(path, R_OK) == -1)
|
||||||
|
mkdir(path, 0777);
|
||||||
|
|
||||||
|
router->binlogdir = strdup(path);
|
||||||
|
}
|
||||||
|
if (access(router->binlogdir, R_OK) == -1)
|
||||||
|
{
|
||||||
|
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||||
|
"%s: Unable to read the binlog directory %s.",
|
||||||
|
router->service->name, router->binlogdir)));
|
||||||
}
|
}
|
||||||
strcat(path, "/");
|
|
||||||
strcat(path, router->service->name);
|
|
||||||
|
|
||||||
if (access(path, R_OK) == -1)
|
|
||||||
mkdir(path, 0777);
|
|
||||||
|
|
||||||
router->binlogdir = strdup(path);
|
|
||||||
|
|
||||||
/* First try to find a binlog file number by reading the directory */
|
/* First try to find a binlog file number by reading the directory */
|
||||||
root_len = strlen(router->fileroot);
|
root_len = strlen(router->fileroot);
|
||||||
|
@ -93,6 +93,7 @@ blr_start_master(ROUTER_INSTANCE *router)
|
|||||||
DCB *client;
|
DCB *client;
|
||||||
GWBUF *buf;
|
GWBUF *buf;
|
||||||
|
|
||||||
|
router->stats.n_binlogs_ses = 0;
|
||||||
if ((client = dcb_alloc(DCB_ROLE_INTERNAL)) == NULL)
|
if ((client = dcb_alloc(DCB_ROLE_INTERNAL)) == NULL)
|
||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||||
@ -101,6 +102,7 @@ GWBUF *buf;
|
|||||||
}
|
}
|
||||||
router->client = client;
|
router->client = client;
|
||||||
client->data = CreateMySQLAuthData(router->user, router->password, "");
|
client->data = CreateMySQLAuthData(router->user, router->password, "");
|
||||||
|
client->state = DCB_STATE_POLLING; // Lie to keep the protocol module happy
|
||||||
if ((router->session = session_alloc(router->service, client)) == NULL)
|
if ((router->session = session_alloc(router->service, client)) == NULL)
|
||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||||
@ -279,7 +281,11 @@ char query[128];
|
|||||||
// Response to fetch of master's server-id
|
// Response to fetch of master's server-id
|
||||||
router->saved_master.server_id = buf;
|
router->saved_master.server_id = buf;
|
||||||
// TODO: Extract the value of server-id and place in router->master_id
|
// TODO: Extract the value of server-id and place in router->master_id
|
||||||
buf = blr_make_query("SET @master_heartbeat_period = 1799999979520");
|
{
|
||||||
|
char str[80];
|
||||||
|
sprintf(str, "SET @master_heartbeat_period = %lu000000000", router->heartbeat);
|
||||||
|
buf = blr_make_query(str);
|
||||||
|
}
|
||||||
router->master_state = BLRM_HBPERIOD;
|
router->master_state = BLRM_HBPERIOD;
|
||||||
router->master->func.write(router->master, buf);
|
router->master->func.write(router->master, buf);
|
||||||
break;
|
break;
|
||||||
@ -654,6 +660,7 @@ static REP_HEADER phdr;
|
|||||||
if (hdr.ok == 0)
|
if (hdr.ok == 0)
|
||||||
{
|
{
|
||||||
router->stats.n_binlogs++;
|
router->stats.n_binlogs++;
|
||||||
|
router->stats.n_binlogs_ses++;
|
||||||
router->lastEventReceived = hdr.event_type;
|
router->lastEventReceived = hdr.event_type;
|
||||||
|
|
||||||
// #define SHOW_EVENTS
|
// #define SHOW_EVENTS
|
||||||
|
@ -577,8 +577,9 @@ uint32_t chksum;
|
|||||||
|
|
||||||
LOGIF(LM, (skygw_log_write(
|
LOGIF(LM, (skygw_log_write(
|
||||||
LOGFILE_MESSAGE,
|
LOGFILE_MESSAGE,
|
||||||
"%s: New slave %s requested binlog file %s from position %lu",
|
"%s: New slave %s, server id %d, requested binlog file %s from position %lu",
|
||||||
router->service->name, slave->dcb->remote,
|
router->service->name, slave->dcb->remote,
|
||||||
|
slave->serverid,
|
||||||
slave->binlogfile, slave->binlog_pos)));
|
slave->binlogfile, slave->binlog_pos)));
|
||||||
|
|
||||||
if (slave->binlog_pos != router->binlog_position ||
|
if (slave->binlog_pos != router->binlog_position ||
|
||||||
|
Reference in New Issue
Block a user