Add initial file number
Fix for short binlog file names in rotate Socket buffering changes
This commit is contained in:
@ -1219,7 +1219,7 @@ printDCB(DCB *dcb)
|
|||||||
static void
|
static void
|
||||||
spin_reporter(void *dcb, char *desc, int value)
|
spin_reporter(void *dcb, char *desc, int value)
|
||||||
{
|
{
|
||||||
dcb_printf((DCB *)dcb, "\t\t%-35s %d\n", desc, value);
|
dcb_printf((DCB *)dcb, "\t\t%-40s %d\n", desc, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -28,10 +28,10 @@
|
|||||||
* Configuration for send and receive socket buffer sizes for
|
* Configuration for send and receive socket buffer sizes for
|
||||||
* backend and cleint connections.
|
* backend and cleint connections.
|
||||||
*/
|
*/
|
||||||
#define GW_BACKEND_SO_SNDBUF 32768
|
#define GW_BACKEND_SO_SNDBUF (128 * 1024)
|
||||||
#define GW_BACKEND_SO_RCVBUF 32768
|
#define GW_BACKEND_SO_RCVBUF (128 * 1024)
|
||||||
#define GW_CLIENT_SO_SNDBUF 32768
|
#define GW_CLIENT_SO_SNDBUF (128 * 1024)
|
||||||
#define GW_CLIENT_SO_RCVBUF 32768
|
#define GW_CLIENT_SO_RCVBUF (128 * 1024)
|
||||||
|
|
||||||
#define GW_NOINTR_CALL(A) do { errno = 0; A; } while (errno == EINTR)
|
#define GW_NOINTR_CALL(A) do { errno = 0; A; } while (errno == EINTR)
|
||||||
#define GW_MYSQL_LOOP_TIMEOUT 300000000
|
#define GW_MYSQL_LOOP_TIMEOUT 300000000
|
||||||
|
@ -1180,9 +1180,9 @@ int gw_MySQLAccept(DCB *listener)
|
|||||||
conn_open[c_sock] = true;
|
conn_open[c_sock] = true;
|
||||||
#endif
|
#endif
|
||||||
/* set nonblocking */
|
/* set nonblocking */
|
||||||
sendbuf = GW_BACKEND_SO_SNDBUF;
|
sendbuf = GW_CLIENT_SO_SNDBUF;
|
||||||
setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen);
|
setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen);
|
||||||
sendbuf = GW_BACKEND_SO_RCVBUF;
|
sendbuf = GW_CLIENT_SO_RCVBUF;
|
||||||
setsockopt(c_sock, SOL_SOCKET, SO_RCVBUF, &sendbuf, optlen);
|
setsockopt(c_sock, SOL_SOCKET, SO_RCVBUF, &sendbuf, optlen);
|
||||||
setnonblocking(c_sock);
|
setnonblocking(c_sock);
|
||||||
|
|
||||||
|
@ -766,9 +766,9 @@ int gw_do_connect_to_backend(
|
|||||||
/* prepare for connect */
|
/* prepare for connect */
|
||||||
setipaddress(&serv_addr.sin_addr, host);
|
setipaddress(&serv_addr.sin_addr, host);
|
||||||
serv_addr.sin_port = htons(port);
|
serv_addr.sin_port = htons(port);
|
||||||
bufsize = GW_CLIENT_SO_SNDBUF;
|
bufsize = GW_BACKEND_SO_SNDBUF;
|
||||||
setsockopt(so, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize));
|
setsockopt(so, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize));
|
||||||
bufsize = GW_CLIENT_SO_RCVBUF;
|
bufsize = GW_BACKEND_SO_RCVBUF;
|
||||||
setsockopt(so, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize));
|
setsockopt(so, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize));
|
||||||
/* set socket to as non-blocking here */
|
/* set socket to as non-blocking here */
|
||||||
setnonblocking(so);
|
setnonblocking(so);
|
||||||
|
@ -172,6 +172,7 @@ int i;
|
|||||||
|
|
||||||
inst->low_water = DEF_LOW_WATER;
|
inst->low_water = DEF_LOW_WATER;
|
||||||
inst->high_water = DEF_HIGH_WATER;
|
inst->high_water = DEF_HIGH_WATER;
|
||||||
|
inst->initbinlog = 0;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We only support one server behind this router, since the server is
|
* We only support one server behind this router, since the server is
|
||||||
@ -244,6 +245,10 @@ int i;
|
|||||||
{
|
{
|
||||||
inst->fileroot = strdup(value);
|
inst->fileroot = strdup(value);
|
||||||
}
|
}
|
||||||
|
else if (strcmp(options[i], "initialfile") == 0)
|
||||||
|
{
|
||||||
|
inst->initbinlog = atoi(value);
|
||||||
|
}
|
||||||
else if (strcmp(options[i], "lowwater") == 0)
|
else if (strcmp(options[i], "lowwater") == 0)
|
||||||
{
|
{
|
||||||
inst->low_water = atoi(value);
|
inst->low_water = atoi(value);
|
||||||
@ -450,11 +455,14 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
|
|||||||
* TODO: Handle closure of master session
|
* TODO: Handle closure of master session
|
||||||
*/
|
*/
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
LOGFILE_ERROR, "Binlog router close session with master")));
|
LOGFILE_ERROR,
|
||||||
|
"Binlog router close session with master server %s",
|
||||||
|
router->service->databases->unique_name)));
|
||||||
blr_master_reconnect(router);
|
blr_master_reconnect(router);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
CHK_CLIENT_RSES(slave);
|
CHK_CLIENT_RSES(slave);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Lock router client session for secure read and update.
|
* Lock router client session for secure read and update.
|
||||||
*/
|
*/
|
||||||
|
@ -115,6 +115,10 @@ struct dirent *dp;
|
|||||||
|
|
||||||
if (n == 0) // No binlog files found
|
if (n == 0) // No binlog files found
|
||||||
{
|
{
|
||||||
|
if (router->initbinlog)
|
||||||
|
sprintf(filename, BINLOG_NAMEFMT, router->fileroot,
|
||||||
|
router->initbinlog);
|
||||||
|
else
|
||||||
sprintf(filename, BINLOG_NAMEFMT, router->fileroot, 1);
|
sprintf(filename, BINLOG_NAMEFMT, router->fileroot, 1);
|
||||||
blr_file_create(router, filename);
|
blr_file_create(router, filename);
|
||||||
}
|
}
|
||||||
|
@ -105,7 +105,8 @@ GWBUF *buf;
|
|||||||
if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL)
|
if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL)
|
||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||||
"Binlog router: failed to connect to master\n")));
|
"Binlog router: failed to connect to master server '%s'\n",
|
||||||
|
router->service->databases->unique_name)));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -828,7 +829,7 @@ char file[BINLOG_FNAMELEN+1];
|
|||||||
pos = extract_field(ptr+4, 32);
|
pos = extract_field(ptr+4, 32);
|
||||||
pos <<= 32;
|
pos <<= 32;
|
||||||
pos |= extract_field(ptr, 32);
|
pos |= extract_field(ptr, 32);
|
||||||
slen = len - 8;
|
slen = len - (8 + 4); // Allow for position and CRC
|
||||||
if (slen > BINLOG_FNAMELEN)
|
if (slen > BINLOG_FNAMELEN)
|
||||||
slen = BINLOG_FNAMELEN;
|
slen = BINLOG_FNAMELEN;
|
||||||
memcpy(file, ptr + 8, slen);
|
memcpy(file, ptr + 8, slen);
|
||||||
|
@ -95,7 +95,7 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_add(&slave->stats.n_requests, 1);
|
slave->stats.n_requests++;
|
||||||
switch (MYSQL_COMMAND(queue))
|
switch (MYSQL_COMMAND(queue))
|
||||||
{
|
{
|
||||||
case COM_QUERY:
|
case COM_QUERY:
|
||||||
@ -796,7 +796,7 @@ doitagain:
|
|||||||
slave->binlogfile)));
|
slave->binlogfile)));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
atomic_add(&slave->stats.n_bursts, 1);
|
slave->stats.n_bursts++;
|
||||||
spinlock_acquire(&slave->catch_lock);
|
spinlock_acquire(&slave->catch_lock);
|
||||||
slave->cstate |= CS_INNERLOOP;
|
slave->cstate |= CS_INNERLOOP;
|
||||||
spinlock_release(&slave->catch_lock);
|
spinlock_release(&slave->catch_lock);
|
||||||
@ -830,7 +830,7 @@ if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++;
|
|||||||
slave->binlog_pos = hdr.next_pos;
|
slave->binlog_pos = hdr.next_pos;
|
||||||
}
|
}
|
||||||
rval = written;
|
rval = written;
|
||||||
atomic_add(&slave->stats.n_events, 1);
|
slave->stats.n_events++;
|
||||||
burst++;
|
burst++;
|
||||||
}
|
}
|
||||||
if (record == NULL)
|
if (record == NULL)
|
||||||
@ -843,7 +843,7 @@ if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++;
|
|||||||
} while (record && DCB_BELOW_LOW_WATER(slave->dcb));
|
} while (record && DCB_BELOW_LOW_WATER(slave->dcb));
|
||||||
if (record)
|
if (record)
|
||||||
{
|
{
|
||||||
atomic_add(&slave->stats.n_flows, 1);
|
slave->stats.n_flows++;
|
||||||
spinlock_acquire(&slave->catch_lock);
|
spinlock_acquire(&slave->catch_lock);
|
||||||
slave->cstate |= CS_EXPECTCB;
|
slave->cstate |= CS_EXPECTCB;
|
||||||
spinlock_release(&slave->catch_lock);
|
spinlock_release(&slave->catch_lock);
|
||||||
@ -854,7 +854,7 @@ if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++;
|
|||||||
spinlock_acquire(&slave->catch_lock);
|
spinlock_acquire(&slave->catch_lock);
|
||||||
if ((slave->cstate & CS_UPTODATE) == 0)
|
if ((slave->cstate & CS_UPTODATE) == 0)
|
||||||
{
|
{
|
||||||
atomic_add(&slave->stats.n_upd, 1);
|
slave->stats.n_upd++;
|
||||||
slave->cstate |= CS_UPTODATE;
|
slave->cstate |= CS_UPTODATE;
|
||||||
state_change = 1;
|
state_change = 1;
|
||||||
}
|
}
|
||||||
@ -907,7 +907,7 @@ ROUTER_INSTANCE *router = slave->router;
|
|||||||
if (slave->state == BLRS_DUMPING &&
|
if (slave->state == BLRS_DUMPING &&
|
||||||
slave->binlog_pos != router->binlog_position)
|
slave->binlog_pos != router->binlog_position)
|
||||||
{
|
{
|
||||||
atomic_add(&slave->stats.n_dcb, 1);
|
slave->stats.n_dcb++;
|
||||||
blr_slave_catchup(router, slave);
|
blr_slave_catchup(router, slave);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -916,12 +916,12 @@ ROUTER_INSTANCE *router = slave->router;
|
|||||||
{
|
{
|
||||||
if (slave->state == BLRS_DUMPING)
|
if (slave->state == BLRS_DUMPING)
|
||||||
{
|
{
|
||||||
atomic_add(&slave->stats.n_cb, 1);
|
slave->stats.n_cb++;
|
||||||
blr_slave_catchup(router, slave);
|
blr_slave_catchup(router, slave);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
atomic_add(&slave->stats.n_cbna, 1);
|
slave->stats.n_cbna++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
Reference in New Issue
Block a user