Merge branch 'release-1.0GA' into MAX-324
This commit is contained in:
@ -23,4 +23,7 @@ target_link_libraries(cli log_manager utils)
|
||||
install(TARGETS cli DESTINATION modules)
|
||||
|
||||
add_subdirectory(readwritesplit)
|
||||
if(BUILD_BINLOG)
|
||||
add_subdirectory(binlog)
|
||||
endif()
|
||||
|
||||
|
@ -59,6 +59,8 @@
|
||||
#include <mysql_client_server_protocol.h>
|
||||
|
||||
extern int lm_enabled_logfiles_bitmask;
|
||||
extern size_t log_ses_count[];
|
||||
extern __thread log_info_t tls_log_info;
|
||||
|
||||
static char *version_str = "V1.0.6";
|
||||
|
||||
@ -186,6 +188,8 @@ int i;
|
||||
inst->long_burst = DEF_LONG_BURST;
|
||||
inst->burst_size = DEF_BURST_SIZE;
|
||||
inst->retry_backoff = 1;
|
||||
inst->binlogdir = NULL;
|
||||
inst->heartbeat = 300; // Default is every 5 minutes
|
||||
|
||||
/*
|
||||
* We only support one server behind this router, since the server is
|
||||
@ -306,6 +310,14 @@ int i;
|
||||
inst->burst_size = size;
|
||||
|
||||
}
|
||||
else if (strcmp(options[i], "heartbeat") == 0)
|
||||
{
|
||||
inst->heartbeat = atoi(value);
|
||||
}
|
||||
else if (strcmp(options[i], "binlogdir") == 0)
|
||||
{
|
||||
inst->binlogdir = strdup(value);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
@ -416,6 +428,7 @@ ROUTER_SLAVE *slave;
|
||||
slave->router = inst;
|
||||
slave->file = NULL;
|
||||
strcpy(slave->binlogfile, "unassigned");
|
||||
slave->connect_time = time(0);
|
||||
|
||||
/**
|
||||
* Add this session to the list of active sessions.
|
||||
@ -509,9 +522,13 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
|
||||
{
|
||||
/*
|
||||
* We must be closing the master session.
|
||||
*
|
||||
* TODO: Handle closure of master session
|
||||
*/
|
||||
LOGIF(LM, (skygw_log_write_flush(
|
||||
LOGFILE_MESSAGE,
|
||||
"%s: Master %s disconnected after %ld seconds. "
|
||||
"%d events read,",
|
||||
router->service->name, router->master->remote,
|
||||
time(0) - router->connect_time, router->stats.n_binlogs_ses)));
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Binlog router close session with master server %s",
|
||||
@ -529,6 +546,15 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
|
||||
/* decrease server registered slaves counter */
|
||||
atomic_add(&router->stats.n_registered, -1);
|
||||
|
||||
LOGIF(LM, (skygw_log_write_flush(
|
||||
LOGFILE_MESSAGE,
|
||||
"%s: Slave %s, server id %d, disconnected after %ld seconds. "
|
||||
"%d events sent, %lu bytes.",
|
||||
router->service->name, slave->dcb->remote,
|
||||
slave->serverid,
|
||||
time(0) - slave->connect_time, slave->stats.n_events,
|
||||
slave->stats.n_bytes)));
|
||||
|
||||
/*
|
||||
* Mark the slave as unregistered to prevent the forwarding
|
||||
* of any more binlog records to this slave.
|
||||
@ -641,25 +667,29 @@ struct tm tm;
|
||||
min5 /= 5.0;
|
||||
|
||||
|
||||
dcb_printf(dcb, "\tMaster connection DCB: %p\n",
|
||||
dcb_printf(dcb, "\tMaster connection DCB: %p\n",
|
||||
router_inst->master);
|
||||
dcb_printf(dcb, "\tMaster connection state: %s\n",
|
||||
dcb_printf(dcb, "\tMaster connection state: %s\n",
|
||||
blrm_states[router_inst->master_state]);
|
||||
|
||||
localtime_r(&router_inst->stats.lastReply, &tm);
|
||||
asctime_r(&tm, buf);
|
||||
|
||||
dcb_printf(dcb, "\tNumber of master connects: %d\n",
|
||||
dcb_printf(dcb, "\tBinlog directory: %s\n",
|
||||
router_inst->binlogdir);
|
||||
dcb_printf(dcb, "\tNumber of master connects: %d\n",
|
||||
router_inst->stats.n_masterstarts);
|
||||
dcb_printf(dcb, "\tNumber of delayed reconnects: %d\n",
|
||||
dcb_printf(dcb, "\tNumber of delayed reconnects: %d\n",
|
||||
router_inst->stats.n_delayedreconnects);
|
||||
dcb_printf(dcb, "\tCurrent binlog file: %s\n",
|
||||
dcb_printf(dcb, "\tCurrent binlog file: %s\n",
|
||||
router_inst->binlog_name);
|
||||
dcb_printf(dcb, "\tCurrent binlog position: %u\n",
|
||||
dcb_printf(dcb, "\tCurrent binlog position: %u\n",
|
||||
router_inst->binlog_position);
|
||||
dcb_printf(dcb, "\tNumber of slave servers: %u\n",
|
||||
dcb_printf(dcb, "\tNumber of slave servers: %u\n",
|
||||
router_inst->stats.n_slaves);
|
||||
dcb_printf(dcb, "\tNumber of binlog events received: %u\n",
|
||||
dcb_printf(dcb, "\tNo. of binlog events received this session: %u\n",
|
||||
router_inst->stats.n_binlogs_ses);
|
||||
dcb_printf(dcb, "\tTotal no. of binlog events received: %u\n",
|
||||
router_inst->stats.n_binlogs);
|
||||
minno = router_inst->stats.minno - 1;
|
||||
if (minno == -1)
|
||||
@ -668,28 +698,31 @@ struct tm tm;
|
||||
dcb_printf(dcb, "\tCurrent 5 10 15 30 Min Avg\n");
|
||||
dcb_printf(dcb, "\t %6d %8.1f %8.1f %8.1f %8.1f\n",
|
||||
router_inst->stats.minavgs[minno], min5, min10, min15, min30);
|
||||
dcb_printf(dcb, "\tNumber of fake binlog events: %u\n",
|
||||
dcb_printf(dcb, "\tNumber of fake binlog events: %u\n",
|
||||
router_inst->stats.n_fakeevents);
|
||||
dcb_printf(dcb, "\tNumber of artificial binlog events: %u\n",
|
||||
dcb_printf(dcb, "\tNumber of artificial binlog events: %u\n",
|
||||
router_inst->stats.n_artificial);
|
||||
dcb_printf(dcb, "\tNumber of binlog events in error: %u\n",
|
||||
dcb_printf(dcb, "\tNumber of binlog events in error: %u\n",
|
||||
router_inst->stats.n_binlog_errors);
|
||||
dcb_printf(dcb, "\tNumber of binlog rotate events: %u\n",
|
||||
dcb_printf(dcb, "\tNumber of binlog rotate events: %u\n",
|
||||
router_inst->stats.n_rotates);
|
||||
dcb_printf(dcb, "\tNumber of heartbeat events: %u\n",
|
||||
dcb_printf(dcb, "\tNumber of heartbeat events: %u\n",
|
||||
router_inst->stats.n_heartbeats);
|
||||
dcb_printf(dcb, "\tNumber of packets received: %u\n",
|
||||
dcb_printf(dcb, "\tNumber of packets received: %u\n",
|
||||
router_inst->stats.n_reads);
|
||||
dcb_printf(dcb, "\tNumber of residual data packets: %u\n",
|
||||
dcb_printf(dcb, "\tNumber of residual data packets: %u\n",
|
||||
router_inst->stats.n_residuals);
|
||||
dcb_printf(dcb, "\tAverage events per packet %.1f\n",
|
||||
dcb_printf(dcb, "\tAverage events per packet %.1f\n",
|
||||
(double)router_inst->stats.n_binlogs / router_inst->stats.n_reads);
|
||||
dcb_printf(dcb, "\tLast event from master at: %s",
|
||||
dcb_printf(dcb, "\tLast event from master at: %s",
|
||||
buf);
|
||||
dcb_printf(dcb, "\t (%d seconds ago)\n",
|
||||
time(0) - router_inst->stats.lastReply);
|
||||
dcb_printf(dcb, "\tLast event from master: 0x%x\n",
|
||||
router_inst->lastEventReceived);
|
||||
dcb_printf(dcb, "\tLast event from master: 0x%x (%s)\n",
|
||||
router_inst->lastEventReceived,
|
||||
(router_inst->lastEventReceived >= 0 &&
|
||||
router_inst->lastEventReceived < 0x24) ?
|
||||
event_names[router_inst->lastEventReceived] : "unknown");
|
||||
if (router_inst->active_logs)
|
||||
dcb_printf(dcb, "\tRouter processing binlog records\n");
|
||||
if (router_inst->reconnect_pending)
|
||||
@ -697,7 +730,7 @@ struct tm tm;
|
||||
dcb_printf(dcb, "\tEvents received:\n");
|
||||
for (i = 0; i < 0x24; i++)
|
||||
{
|
||||
dcb_printf(dcb, "\t\t%-38s: %u\n", event_names[i], router_inst->stats.events[i]);
|
||||
dcb_printf(dcb, "\t\t%-38s %u\n", event_names[i], router_inst->stats.events[i]);
|
||||
}
|
||||
|
||||
#if SPINLOCK_PROFILE
|
||||
@ -739,19 +772,44 @@ struct tm tm;
|
||||
min15 /= 15.0;
|
||||
min10 /= 10.0;
|
||||
min5 /= 5.0;
|
||||
dcb_printf(dcb, "\t\tServer-id: %d\n", session->serverid);
|
||||
dcb_printf(dcb,
|
||||
"\t\tServer-id: %d\n",
|
||||
session->serverid);
|
||||
if (session->hostname)
|
||||
dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname);
|
||||
dcb_printf(dcb, "\t\tSlave DCB: %p\n", session->dcb);
|
||||
dcb_printf(dcb, "\t\tNext Sequence No: %d\n", session->seqno);
|
||||
dcb_printf(dcb, "\t\tState: %s\n", blrs_states[session->state]);
|
||||
dcb_printf(dcb, "\t\tBinlog file: %s\n", session->binlogfile);
|
||||
dcb_printf(dcb, "\t\tBinlog position: %u\n", session->binlog_pos);
|
||||
dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname);
|
||||
dcb_printf(dcb,
|
||||
"\t\tSlave: %d\n",
|
||||
session->dcb->remote);
|
||||
dcb_printf(dcb,
|
||||
"\t\tSlave DCB: %p\n",
|
||||
session->dcb);
|
||||
dcb_printf(dcb,
|
||||
"\t\tNext Sequence No: %d\n",
|
||||
session->seqno);
|
||||
dcb_printf(dcb,
|
||||
"\t\tState: %s\n",
|
||||
blrs_states[session->state]);
|
||||
dcb_printf(dcb,
|
||||
"\t\tBinlog file: %s\n",
|
||||
session->binlogfile);
|
||||
dcb_printf(dcb,
|
||||
"\t\tBinlog position: %u\n",
|
||||
session->binlog_pos);
|
||||
if (session->nocrc)
|
||||
dcb_printf(dcb, "\t\tMaster Binlog CRC: None\n");
|
||||
dcb_printf(dcb, "\t\tNo. requests: %u\n", session->stats.n_requests);
|
||||
dcb_printf(dcb, "\t\tNo. events sent: %u\n", session->stats.n_events);
|
||||
dcb_printf(dcb, "\t\tNo. bursts sent: %u\n", session->stats.n_bursts);
|
||||
dcb_printf(dcb,
|
||||
"\t\tMaster Binlog CRC: None\n");
|
||||
dcb_printf(dcb,
|
||||
"\t\tNo. requests: %u\n",
|
||||
session->stats.n_requests);
|
||||
dcb_printf(dcb,
|
||||
"\t\tNo. events sent: %u\n",
|
||||
session->stats.n_events);
|
||||
dcb_printf(dcb,
|
||||
"\t\tNo. bursts sent: %u\n",
|
||||
session->stats.n_bursts);
|
||||
dcb_printf(dcb,
|
||||
"\t\tNo. transitions to follow mode: %u\n",
|
||||
session->stats.n_bursts);
|
||||
minno = session->stats.minno - 1;
|
||||
if (minno == -1)
|
||||
minno = 30;
|
||||
@ -760,15 +818,18 @@ struct tm tm;
|
||||
dcb_printf(dcb, "\t\t %6d %8.1f %8.1f %8.1f %8.1f\n",
|
||||
session->stats.minavgs[minno], min5, min10,
|
||||
min15, min30);
|
||||
dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows);
|
||||
dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd);
|
||||
dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb);
|
||||
dcb_printf(dcb, "\t\tNo. of low water cbs N/A %u\n", session->stats.n_cbna);
|
||||
dcb_printf(dcb, "\t\tNo. of failed reads %u\n", session->stats.n_failed_read);
|
||||
dcb_printf(dcb, "\t\tNo. of nested distribute events %u\n", session->stats.n_overrun);
|
||||
dcb_printf(dcb, "\t\tNo. of distribute action 1 %u\n", session->stats.n_actions[0]);
|
||||
dcb_printf(dcb, "\t\tNo. of distribute action 2 %u\n", session->stats.n_actions[1]);
|
||||
dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]);
|
||||
dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows);
|
||||
dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd);
|
||||
dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb);
|
||||
dcb_printf(dcb, "\t\tNo. of failed reads %u\n", session->stats.n_failed_read);
|
||||
|
||||
#if DETAILED_DIAG
|
||||
dcb_printf(dcb, "\t\tNo. of nested distribute events %u\n", session->stats.n_overrun);
|
||||
dcb_printf(dcb, "\t\tNo. of distribute action 1 %u\n", session->stats.n_actions[0]);
|
||||
dcb_printf(dcb, "\t\tNo. of distribute action 2 %u\n", session->stats.n_actions[1]);
|
||||
dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]);
|
||||
#endif
|
||||
|
||||
if ((session->cstate & CS_UPTODATE) == 0)
|
||||
{
|
||||
dcb_printf(dcb, "\t\tSlave is in catchup mode. %s%s\n",
|
||||
@ -793,7 +854,7 @@ struct tm tm;
|
||||
dcb_printf(dcb, "\tSpinlock statistics (rses_lock):\n");
|
||||
spinlock_stats(&session->rses_lock, spin_reporter, dcb);
|
||||
#endif
|
||||
dcb_printf(dcb, "\n");
|
||||
dcb_printf(dcb, "\t\t--------------------\n\n");
|
||||
session = session->next;
|
||||
}
|
||||
spinlock_release(&router_inst->lock);
|
||||
@ -822,6 +883,24 @@ ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
|
||||
router->stats.lastReply = time(0);
|
||||
}
|
||||
|
||||
static char *
|
||||
extract_message(GWBUF *errpkt)
|
||||
{
|
||||
char *rval;
|
||||
int len;
|
||||
|
||||
len = EXTRACT24(errpkt->start);
|
||||
if ((rval = (char *)malloc(len)) == NULL)
|
||||
return NULL;
|
||||
memcpy(rval, (char *)(errpkt->start) + 7, 6);
|
||||
rval[6] = ' ';
|
||||
memcpy(&rval[7], (char *)(errpkt->start) + 13, len - 8);
|
||||
rval[len-2] = 0;
|
||||
return rval;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Error Reply routine
|
||||
*
|
||||
@ -841,10 +920,10 @@ errorReply(ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_
|
||||
{
|
||||
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
|
||||
int error, len;
|
||||
char msg[85];
|
||||
char msg[85], *errmsg;
|
||||
|
||||
len = sizeof(error);
|
||||
if (getsockopt(router->master->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0)
|
||||
if (router->master && getsockopt(router->master->fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0 && error != 0)
|
||||
{
|
||||
strerror_r(error, msg, 80);
|
||||
strcat(msg, " ");
|
||||
@ -852,10 +931,21 @@ char msg[85];
|
||||
else
|
||||
strcpy(msg, "");
|
||||
|
||||
errmsg = extract_message(message);
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR, "Master connection '%s', %sattempting reconnect to master",
|
||||
message, msg)));
|
||||
LOGFILE_ERROR, "%s: Master connection error '%s' in state '%s', "
|
||||
"%sattempting reconnect to master",
|
||||
router->service->name, errmsg,
|
||||
blrm_states[router->master_state], msg)));
|
||||
if (errmsg)
|
||||
free(errmsg);
|
||||
*succp = true;
|
||||
LOGIF(LM, (skygw_log_write_flush(
|
||||
LOGFILE_MESSAGE,
|
||||
"%s: Master %s disconnected after %ld seconds. "
|
||||
"%d events read.",
|
||||
router->service->name, router->master->remote,
|
||||
time(0) - router->connect_time, router->stats.n_binlogs_ses)));
|
||||
blr_master_reconnect(router);
|
||||
}
|
||||
|
||||
|
@ -54,6 +54,8 @@
|
||||
|
||||
|
||||
extern int lm_enabled_logfiles_bitmask;
|
||||
extern size_t log_ses_count[];
|
||||
extern __thread log_info_t tls_log_info;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -51,6 +51,8 @@
|
||||
#include <log_manager.h>
|
||||
|
||||
extern int lm_enabled_logfiles_bitmask;
|
||||
extern size_t log_ses_count[];
|
||||
extern __thread log_info_t tls_log_info;
|
||||
|
||||
|
||||
static void blr_file_create(ROUTER_INSTANCE *router, char *file);
|
||||
@ -75,18 +77,27 @@ int root_len, i;
|
||||
DIR *dirp;
|
||||
struct dirent *dp;
|
||||
|
||||
strcpy(path, "/usr/local/skysql/MaxScale");
|
||||
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
|
||||
if (router->binlogdir == NULL)
|
||||
{
|
||||
strcpy(path, ptr);
|
||||
strcpy(path, "/usr/local/skysql/MaxScale");
|
||||
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
|
||||
{
|
||||
strcpy(path, ptr);
|
||||
}
|
||||
strcat(path, "/");
|
||||
strcat(path, router->service->name);
|
||||
|
||||
if (access(path, R_OK) == -1)
|
||||
mkdir(path, 0777);
|
||||
|
||||
router->binlogdir = strdup(path);
|
||||
}
|
||||
if (access(router->binlogdir, R_OK) == -1)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"%s: Unable to read the binlog directory %s.",
|
||||
router->service->name, router->binlogdir)));
|
||||
}
|
||||
strcat(path, "/");
|
||||
strcat(path, router->service->name);
|
||||
|
||||
if (access(path, R_OK) == -1)
|
||||
mkdir(path, 0777);
|
||||
|
||||
router->binlogdir = strdup(path);
|
||||
|
||||
/* First try to find a binlog file number by reading the directory */
|
||||
root_len = strlen(router->fileroot);
|
||||
@ -353,7 +364,7 @@ struct stat statb;
|
||||
"Short read when reading the header. "
|
||||
"Expected 19 bytes but got %d bytes. "
|
||||
"Binlog file is %s, position %d",
|
||||
file->binlogname, pos, n)));
|
||||
n, file->binlogname, pos)));
|
||||
break;
|
||||
}
|
||||
return NULL;
|
||||
@ -364,6 +375,17 @@ struct stat statb;
|
||||
hdr->event_size = extract_field(&hdbuf[9], 32);
|
||||
hdr->next_pos = EXTRACT32(&hdbuf[13]);
|
||||
hdr->flags = EXTRACT16(&hdbuf[17]);
|
||||
|
||||
if (hdr->event_type > MAX_EVENT_TYPE)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"Invalid event type 0x%x. "
|
||||
"Binlog file is %s, position %d",
|
||||
hdr->event_type,
|
||||
file->binlogname, pos)));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
|
@ -63,6 +63,8 @@
|
||||
|
||||
|
||||
extern int lm_enabled_logfiles_bitmask;
|
||||
extern size_t log_ses_count[];
|
||||
extern __thread log_info_t tls_log_info;
|
||||
|
||||
static GWBUF *blr_make_query(char *statement);
|
||||
static GWBUF *blr_make_registration(ROUTER_INSTANCE *router);
|
||||
@ -91,6 +93,18 @@ blr_start_master(ROUTER_INSTANCE *router)
|
||||
DCB *client;
|
||||
GWBUF *buf;
|
||||
|
||||
router->stats.n_binlogs_ses = 0;
|
||||
spinlock_acquire(&router->lock);
|
||||
if (router->master_state != BLRM_UNCONNECTED)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||
"%s: Master Connect: Unexpected master state %s\n",
|
||||
router->service->name, blrm_states[router->master_state])));
|
||||
spinlock_release(&router->lock);
|
||||
return;
|
||||
}
|
||||
router->master_state = BLRM_CONNECTING;
|
||||
spinlock_release(&router->lock);
|
||||
if ((client = dcb_alloc(DCB_ROLE_INTERNAL)) == NULL)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||
@ -98,6 +112,7 @@ GWBUF *buf;
|
||||
return;
|
||||
}
|
||||
router->client = client;
|
||||
client->state = DCB_STATE_POLLING; /* Fake the client is reading */
|
||||
client->data = CreateMySQLAuthData(router->user, router->password, "");
|
||||
if ((router->session = session_alloc(router->service, client)) == NULL)
|
||||
{
|
||||
@ -108,17 +123,27 @@ GWBUF *buf;
|
||||
client->session = router->session;
|
||||
if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL)
|
||||
{
|
||||
char *name = malloc(strlen(router->service->name) + strlen(" Master") + 1);
|
||||
sprintf(name, "%s Master", router->service->name);
|
||||
hktask_oneshot(name, blr_start_master, router,
|
||||
BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
|
||||
char *name;
|
||||
if ((name = malloc(strlen(router->service->name)
|
||||
+ strlen(" Master") + 1)) != NULL)
|
||||
{
|
||||
sprintf(name, "%s Master", router->service->name);
|
||||
hktask_oneshot(name, blr_start_master, router,
|
||||
BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
|
||||
}
|
||||
if (router->retry_backoff > BLR_MAX_BACKOFF)
|
||||
router->retry_backoff = 1;
|
||||
router->retry_backoff = BLR_MAX_BACKOFF;
|
||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||
"Binlog router: failed to connect to master server '%s'",
|
||||
router->service->databases->unique_name)));
|
||||
return;
|
||||
}
|
||||
router->master->remote = strdup(router->service->databases->name);
|
||||
LOGIF(LM,(skygw_log_write(
|
||||
LOGFILE_MESSAGE,
|
||||
"%s: atempting to connect to master server %s.",
|
||||
router->service->name, router->master->remote)));
|
||||
router->connect_time = time(0);
|
||||
|
||||
if (setsockopt(router->master->fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive , sizeof(keepalive )))
|
||||
perror("setsockopt");
|
||||
@ -129,7 +154,6 @@ perror("setsockopt");
|
||||
router->master_state = BLRM_TIMESTAMP;
|
||||
|
||||
router->stats.n_masterstarts++;
|
||||
router->retry_backoff = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -160,7 +184,27 @@ GWBUF *ptr;
|
||||
router->reconnect_pending = 0;
|
||||
router->active_logs = 0;
|
||||
spinlock_release(&router->lock);
|
||||
blr_start_master(router);
|
||||
if (router->master_state < BLRM_BINLOGDUMP)
|
||||
{
|
||||
char *name;
|
||||
|
||||
router->master_state = BLRM_UNCONNECTED;
|
||||
|
||||
if ((name = malloc(strlen(router->service->name)
|
||||
+ strlen(" Master")+1)) != NULL);
|
||||
{
|
||||
sprintf(name, "%s Master", router->service->name);
|
||||
hktask_oneshot(name, blr_start_master, router,
|
||||
BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
|
||||
}
|
||||
if (router->retry_backoff > BLR_MAX_BACKOFF)
|
||||
router->retry_backoff = BLR_MAX_BACKOFF;
|
||||
}
|
||||
else
|
||||
{
|
||||
router->master_state = BLRM_UNCONNECTED;
|
||||
blr_start_master(router);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -225,8 +269,9 @@ char query[128];
|
||||
if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.",
|
||||
router->master_state)));
|
||||
LOGFILE_ERROR,
|
||||
"Invalid master state machine state (%d) for binlog router.",
|
||||
router->master_state)));
|
||||
gwbuf_consume(buf, gwbuf_length(buf));
|
||||
spinlock_acquire(&router->lock);
|
||||
if (router->reconnect_pending)
|
||||
@ -234,6 +279,12 @@ char query[128];
|
||||
router->active_logs = 0;
|
||||
spinlock_release(&router->lock);
|
||||
atomic_add(&router->handling_threads, -1);
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"%s: Pending reconnect in state %s.",
|
||||
router->service->name,
|
||||
blrm_states[router->master_state]
|
||||
)));
|
||||
blr_restart_master(router);
|
||||
return;
|
||||
}
|
||||
@ -247,8 +298,11 @@ char query[128];
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"Received error: %d, %s from master during %s phase of the master state machine.",
|
||||
MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state]
|
||||
"%s: Received error: %d, %s from master during %s phase "
|
||||
"of the master state machine.",
|
||||
router->service->name,
|
||||
MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf),
|
||||
blrm_states[router->master_state]
|
||||
)));
|
||||
gwbuf_consume(buf, gwbuf_length(buf));
|
||||
spinlock_acquire(&router->lock);
|
||||
@ -272,12 +326,17 @@ char query[128];
|
||||
buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'");
|
||||
router->master_state = BLRM_SERVERID;
|
||||
router->master->func.write(router->master, buf);
|
||||
router->retry_backoff = 1;
|
||||
break;
|
||||
case BLRM_SERVERID:
|
||||
// Response to fetch of master's server-id
|
||||
router->saved_master.server_id = buf;
|
||||
// TODO: Extract the value of server-id and place in router->master_id
|
||||
buf = blr_make_query("SET @master_heartbeat_period = 1799999979520");
|
||||
{
|
||||
char str[80];
|
||||
sprintf(str, "SET @master_heartbeat_period = %lu000000000", router->heartbeat);
|
||||
buf = blr_make_query(str);
|
||||
}
|
||||
router->master_state = BLRM_HBPERIOD;
|
||||
router->master->func.write(router->master, buf);
|
||||
break;
|
||||
@ -357,6 +416,12 @@ char query[128];
|
||||
buf = blr_make_binlog_dump(router);
|
||||
router->master_state = BLRM_BINLOGDUMP;
|
||||
router->master->func.write(router->master, buf);
|
||||
LOGIF(LM,(skygw_log_write(
|
||||
LOGFILE_MESSAGE,
|
||||
"%s: Request binlog records from %s at "
|
||||
"position %d from master server %s.",
|
||||
router->service->name, router->binlog_name,
|
||||
router->binlog_position, router->master->remote)));
|
||||
break;
|
||||
case BLRM_BINLOGDUMP:
|
||||
// Main body, we have received a binlog record from the master
|
||||
@ -618,133 +683,172 @@ static REP_HEADER phdr;
|
||||
n_bufs = 1;
|
||||
}
|
||||
|
||||
blr_extract_header(ptr, &hdr);
|
||||
|
||||
if (hdr.event_size != len - 5)
|
||||
if (len < BINLOG_EVENT_HDR_LEN)
|
||||
{
|
||||
LOGIF(LE,(skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"Packet length is %d, but event size is %d, "
|
||||
"binlog file %s position %d"
|
||||
"reslen is %d and preslen is %d, "
|
||||
"length of previous event %d. %s",
|
||||
len, hdr.event_size,
|
||||
router->binlog_name,
|
||||
router->binlog_position,
|
||||
reslen, preslen, prev_length,
|
||||
(prev_length == -1 ?
|
||||
(no_residual ? "No residual data from previous call" : "Residual data from previous call") : "")
|
||||
)));
|
||||
blr_log_packet(LOGFILE_ERROR, "Packet:", ptr, len);
|
||||
LOGIF(LE,(skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"This event (0x%x) was contained in %d GWBUFs, "
|
||||
"the previous events was contained in %d GWBUFs",
|
||||
router->lastEventReceived, n_bufs, pn_bufs)));
|
||||
if (msg)
|
||||
char *msg = "";
|
||||
|
||||
if (ptr[4] == 0xfe) /* EOF Packet */
|
||||
{
|
||||
free(msg);
|
||||
msg = NULL;
|
||||
msg = "end of file";
|
||||
}
|
||||
break;
|
||||
else if (ptr[4] == 0xff) /* EOF Packet */
|
||||
{
|
||||
msg = "error";
|
||||
}
|
||||
LOGIF(LM,(skygw_log_write(
|
||||
LOGFILE_MESSAGE,
|
||||
"Non-event message (%s) from master.",
|
||||
msg)));
|
||||
}
|
||||
phdr = hdr;
|
||||
if (hdr.ok == 0)
|
||||
else
|
||||
{
|
||||
router->stats.n_binlogs++;
|
||||
router->stats.n_binlogs_ses++;
|
||||
router->lastEventReceived = hdr.event_type;
|
||||
|
||||
blr_extract_header(ptr, &hdr);
|
||||
|
||||
if (hdr.event_size != len - 5)
|
||||
{
|
||||
LOGIF(LE,(skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"Packet length is %d, but event size is %d, "
|
||||
"binlog file %s position %d "
|
||||
"reslen is %d and preslen is %d, "
|
||||
"length of previous event %d. %s",
|
||||
len, hdr.event_size,
|
||||
router->binlog_name,
|
||||
router->binlog_position,
|
||||
reslen, preslen, prev_length,
|
||||
(prev_length == -1 ?
|
||||
(no_residual ? "No residual data from previous call" : "Residual data from previous call") : "")
|
||||
)));
|
||||
blr_log_packet(LOGFILE_ERROR, "Packet:", ptr, len);
|
||||
LOGIF(LE,(skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"This event (0x%x) was contained in %d GWBUFs, "
|
||||
"the previous events was contained in %d GWBUFs",
|
||||
router->lastEventReceived, n_bufs, pn_bufs)));
|
||||
if (msg)
|
||||
{
|
||||
free(msg);
|
||||
msg = NULL;
|
||||
}
|
||||
break;
|
||||
}
|
||||
phdr = hdr;
|
||||
if (hdr.ok == 0)
|
||||
{
|
||||
router->stats.n_binlogs++;
|
||||
router->lastEventReceived = hdr.event_type;
|
||||
|
||||
// #define SHOW_EVENTS
|
||||
#ifdef SHOW_EVENTS
|
||||
printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size);
|
||||
printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size);
|
||||
#endif
|
||||
if (hdr.event_type >= 0 && hdr.event_type < 0x24)
|
||||
router->stats.events[hdr.event_type]++;
|
||||
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
|
||||
{
|
||||
// Fake format description message
|
||||
LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG,
|
||||
"Replication fake event. "
|
||||
"Binlog %s @ %d.",
|
||||
router->binlog_name,
|
||||
router->binlog_position)));
|
||||
router->stats.n_fakeevents++;
|
||||
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
|
||||
if (hdr.event_type >= 0 && hdr.event_type < 0x24)
|
||||
router->stats.events[hdr.event_type]++;
|
||||
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
|
||||
{
|
||||
/*
|
||||
* We need to save this to replay to new
|
||||
* slaves that attach later.
|
||||
*/
|
||||
if (router->saved_master.fde_event)
|
||||
free(router->saved_master.fde_event);
|
||||
router->saved_master.fde_len = hdr.event_size;
|
||||
router->saved_master.fde_event = malloc(hdr.event_size);
|
||||
if (router->saved_master.fde_event)
|
||||
memcpy(router->saved_master.fde_event,
|
||||
ptr + 5, hdr.event_size);
|
||||
// Fake format description message
|
||||
LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG,
|
||||
"Replication fake event. "
|
||||
"Binlog %s @ %d.",
|
||||
router->binlog_name,
|
||||
router->binlog_position)));
|
||||
router->stats.n_fakeevents++;
|
||||
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
|
||||
{
|
||||
uint8_t *new_fde;
|
||||
unsigned int new_fde_len;
|
||||
/*
|
||||
* We need to save this to replay to new
|
||||
* slaves that attach later.
|
||||
*/
|
||||
new_fde_len = hdr.event_size;
|
||||
new_fde = malloc(hdr.event_size);
|
||||
if (new_fde)
|
||||
{
|
||||
memcpy(new_fde, ptr + 5, hdr.event_size);
|
||||
if (router->saved_master.fde_event)
|
||||
free(router->saved_master.fde_event);
|
||||
router->saved_master.fde_event = new_fde;
|
||||
router->saved_master.fde_len = new_fde_len;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
|
||||
"%s: Received a format description "
|
||||
"event that MaxScale was unable to "
|
||||
"record. Event length is %d.",
|
||||
router->service->name,
|
||||
hdr.event_size)));
|
||||
blr_log_packet(LOGFILE_ERROR,
|
||||
"Format Description Event:", ptr, len);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (hdr.event_type == HEARTBEAT_EVENT)
|
||||
{
|
||||
#ifdef SHOW_EVENTS
|
||||
printf("Replication heartbeat\n");
|
||||
#endif
|
||||
LOGIF(LD,(skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"Replication heartbeat. "
|
||||
"Binlog %s @ %d.",
|
||||
router->binlog_name,
|
||||
router->binlog_position)));
|
||||
router->stats.n_heartbeats++;
|
||||
}
|
||||
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
|
||||
{
|
||||
ptr = ptr + 5; // We don't put the first byte of the payload
|
||||
// into the binlog file
|
||||
if (hdr.event_type == ROTATE_EVENT)
|
||||
router->rotating = 1;
|
||||
blr_write_binlog_record(router, &hdr, ptr);
|
||||
if (hdr.event_type == ROTATE_EVENT)
|
||||
{
|
||||
blr_rotate_event(router, ptr, &hdr);
|
||||
}
|
||||
blr_distribute_binlog_record(router, &hdr, ptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
router->stats.n_artificial++;
|
||||
LOGIF(LD,(skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"Artificial event not written "
|
||||
"to disk or distributed. "
|
||||
"Type 0x%x, Length %d, Binlog "
|
||||
"%s @ %d.",
|
||||
hdr.event_type,
|
||||
hdr.event_size,
|
||||
router->binlog_name,
|
||||
router->binlog_position)));
|
||||
ptr += 5;
|
||||
if (hdr.event_type == ROTATE_EVENT)
|
||||
{
|
||||
router->rotating = 1;
|
||||
blr_rotate_event(router, ptr, &hdr);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (hdr.event_type == HEARTBEAT_EVENT)
|
||||
{
|
||||
#ifdef SHOW_EVENTS
|
||||
printf("Replication heartbeat\n");
|
||||
#endif
|
||||
LOGIF(LD,(skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"Replication heartbeat. "
|
||||
"Binlog %s @ %d.",
|
||||
router->binlog_name,
|
||||
router->binlog_position)));
|
||||
router->stats.n_heartbeats++;
|
||||
}
|
||||
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
|
||||
{
|
||||
ptr = ptr + 5; // We don't put the first byte of the payload
|
||||
// into the binlog file
|
||||
if (hdr.event_type == ROTATE_EVENT)
|
||||
router->rotating = 1;
|
||||
blr_write_binlog_record(router, &hdr, ptr);
|
||||
if (hdr.event_type == ROTATE_EVENT)
|
||||
{
|
||||
blr_rotate_event(router, ptr, &hdr);
|
||||
}
|
||||
blr_distribute_binlog_record(router, &hdr, ptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
router->stats.n_artificial++;
|
||||
LOGIF(LD,(skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"Artificial event not written "
|
||||
"to disk or distributed. "
|
||||
"Type 0x%x, Length %d, Binlog "
|
||||
"%s @ %d.",
|
||||
hdr.event_type,
|
||||
hdr.event_size,
|
||||
router->binlog_name,
|
||||
router->binlog_position)));
|
||||
ptr += 5;
|
||||
if (hdr.event_type == ROTATE_EVENT)
|
||||
{
|
||||
router->rotating = 1;
|
||||
blr_rotate_event(router, ptr, &hdr);
|
||||
}
|
||||
}
|
||||
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
|
||||
"Error packet in binlog stream.%s @ %d.",
|
||||
router->binlog_name,
|
||||
router->binlog_position)));
|
||||
blr_log_packet(LOGFILE_ERROR, "Error Packet:",
|
||||
ptr, len);
|
||||
router->stats.n_binlog_errors++;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
printf("Binlog router error: %s\n", &ptr[7]);
|
||||
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
|
||||
"Error packet in binlog stream.%s @ %d.",
|
||||
router->binlog_name,
|
||||
router->binlog_position)));
|
||||
blr_log_packet(LOGFILE_ERROR, "Error Packet:",
|
||||
ptr, len);
|
||||
router->stats.n_binlog_errors++;
|
||||
}
|
||||
|
||||
if (msg)
|
||||
{
|
||||
@ -968,6 +1072,7 @@ int action;
|
||||
{
|
||||
blr_slave_rotate(slave, ptr);
|
||||
}
|
||||
slave->stats.n_bytes += gwbuf_length(pkt);
|
||||
slave->dcb->func.write(slave->dcb, pkt);
|
||||
if (hdr->event_type != ROTATE_EVENT)
|
||||
{
|
||||
|
@ -65,8 +65,11 @@ int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
|
||||
uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
|
||||
int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
|
||||
static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
||||
static void blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
||||
|
||||
extern int lm_enabled_logfiles_bitmask;
|
||||
extern size_t log_ses_count[];
|
||||
extern __thread log_info_t tls_log_info;
|
||||
|
||||
/**
|
||||
* Process a request packet from the slave server.
|
||||
@ -544,29 +547,8 @@ uint32_t chksum;
|
||||
rval = slave->dcb->func.write(slave->dcb, resp);
|
||||
|
||||
/* Send the FORMAT_DESCRIPTION_EVENT */
|
||||
if (router->saved_master.fde_event)
|
||||
{
|
||||
resp = gwbuf_alloc(router->saved_master.fde_len + 5);
|
||||
ptr = GWBUF_DATA(resp);
|
||||
encode_value(ptr, router->saved_master.fde_len + 1, 24); // Payload length
|
||||
ptr += 3;
|
||||
*ptr++ = slave->seqno++;
|
||||
*ptr++ = 0; // OK
|
||||
memcpy(ptr, router->saved_master.fde_event, router->saved_master.fde_len);
|
||||
encode_value(ptr, time(0), 32); // Overwrite timestamp
|
||||
/*
|
||||
* Since we have changed the timestamp we must recalculate the CRC
|
||||
*
|
||||
* Position ptr to the start of the event header,
|
||||
* calculate a new checksum
|
||||
* and write it into the header
|
||||
*/
|
||||
ptr = GWBUF_DATA(resp) + 5 + router->saved_master.fde_len - 4;
|
||||
chksum = crc32(0L, NULL, 0);
|
||||
chksum = crc32(chksum, GWBUF_DATA(resp) + 5, router->saved_master.fde_len - 4);
|
||||
encode_value(ptr, chksum, 32);
|
||||
rval = slave->dcb->func.write(slave->dcb, resp);
|
||||
}
|
||||
if (slave->binlog_pos != 4)
|
||||
blr_slave_send_fde(router, slave);
|
||||
|
||||
slave->dcb->low_water = router->low_water;
|
||||
slave->dcb->high_water = router->high_water;
|
||||
@ -575,8 +557,9 @@ uint32_t chksum;
|
||||
|
||||
LOGIF(LM, (skygw_log_write(
|
||||
LOGFILE_MESSAGE,
|
||||
"%s: New slave %s requested binlog file %s from position %lu",
|
||||
"%s: New slave %s, server id %d, requested binlog file %s from position %lu",
|
||||
router->service->name, slave->dcb->remote,
|
||||
slave->serverid,
|
||||
slave->binlogfile, slave->binlog_pos)));
|
||||
|
||||
if (slave->binlog_pos != router->binlog_position ||
|
||||
@ -782,6 +765,7 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
|
||||
LOGFILE_ERROR, "blr_open_binlog took %d beats",
|
||||
hkheartbeat - beat1)));
|
||||
}
|
||||
slave->stats.n_bytes += gwbuf_length(head);
|
||||
written = slave->dcb->func.write(slave->dcb, head);
|
||||
if (written && hdr.event_type != ROTATE_EVENT)
|
||||
{
|
||||
@ -839,11 +823,23 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
|
||||
|
||||
if (state_change)
|
||||
{
|
||||
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
|
||||
"%s: Slave %s is up to date %s, %u.",
|
||||
slave->stats.n_caughtup++;
|
||||
if (slave->stats.n_caughtup == 1)
|
||||
{
|
||||
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
|
||||
"%s: Slave %s is up to date %s, %u.",
|
||||
router->service->name,
|
||||
slave->dcb->remote,
|
||||
slave->binlogfile, slave->binlog_pos)));
|
||||
}
|
||||
else if ((slave->stats.n_caughtup % 50) == 0)
|
||||
{
|
||||
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
|
||||
"%s: Slave %s is up to date %s, %u.",
|
||||
router->service->name,
|
||||
slave->dcb->remote,
|
||||
slave->binlogfile, slave->binlog_pos)));
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -1031,3 +1027,51 @@ uint32_t chksum;
|
||||
slave->dcb->func.write(slave->dcb, resp);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a "fake" format description event to the newly connected slave
|
||||
*
|
||||
* @param router The router instance
|
||||
* @param slave The slave to send the event to
|
||||
*/
|
||||
static void
|
||||
blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
|
||||
{
|
||||
BLFILE *file;
|
||||
REP_HEADER hdr;
|
||||
GWBUF *record, *head;
|
||||
uint8_t *ptr;
|
||||
uint32_t chksum;
|
||||
|
||||
if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL)
|
||||
return;
|
||||
if ((record = blr_read_binlog(router, file, 4, &hdr)) == NULL)
|
||||
{
|
||||
blr_close_binlog(router, file);
|
||||
return;
|
||||
}
|
||||
blr_close_binlog(router, file);
|
||||
head = gwbuf_alloc(5);
|
||||
ptr = GWBUF_DATA(head);
|
||||
encode_value(ptr, hdr.event_size + 1, 24); // Payload length
|
||||
ptr += 3;
|
||||
*ptr++ = slave->seqno++;
|
||||
*ptr++ = 0; // OK
|
||||
head = gwbuf_append(head, record);
|
||||
ptr = GWBUF_DATA(record);
|
||||
encode_value(ptr, time(0), 32); // Overwrite timestamp
|
||||
ptr += 13;
|
||||
encode_value(ptr, 0, 32); // Set next position to 0
|
||||
/*
|
||||
* Since we have changed the timestamp we must recalculate the CRC
|
||||
*
|
||||
* Position ptr to the start of the event header,
|
||||
* calculate a new checksum
|
||||
* and write it into the header
|
||||
*/
|
||||
ptr = GWBUF_DATA(record) + hdr.event_size - 4;
|
||||
chksum = crc32(0L, NULL, 0);
|
||||
chksum = crc32(chksum, GWBUF_DATA(record), hdr.event_size - 4);
|
||||
encode_value(ptr, chksum, 32);
|
||||
slave->dcb->func.write(slave->dcb, head);
|
||||
}
|
||||
|
@ -48,7 +48,7 @@
|
||||
|
||||
MODULE_INFO info = {
|
||||
MODULE_API_ROUTER,
|
||||
MODULE_BETA_RELEASE,
|
||||
MODULE_GA,
|
||||
ROUTER_VERSION,
|
||||
"The admin user interface"
|
||||
};
|
||||
|
@ -47,7 +47,7 @@
|
||||
|
||||
MODULE_INFO info = {
|
||||
MODULE_API_ROUTER,
|
||||
MODULE_BETA_RELEASE,
|
||||
MODULE_GA,
|
||||
ROUTER_VERSION,
|
||||
"The debug user interface"
|
||||
};
|
||||
|
@ -96,7 +96,7 @@ extern __thread log_info_t tls_log_info;
|
||||
|
||||
MODULE_INFO info = {
|
||||
MODULE_API_ROUTER,
|
||||
MODULE_BETA_RELEASE,
|
||||
MODULE_GA,
|
||||
ROUTER_VERSION,
|
||||
"A connection based router to load balance based on connections"
|
||||
};
|
||||
|
@ -37,7 +37,7 @@
|
||||
|
||||
MODULE_INFO info = {
|
||||
MODULE_API_ROUTER,
|
||||
MODULE_BETA_RELEASE,
|
||||
MODULE_GA,
|
||||
ROUTER_VERSION,
|
||||
"A Read/Write splitting router for enhancement read scalability"
|
||||
};
|
||||
@ -313,8 +313,8 @@ static int hashkeyfun(
|
||||
}
|
||||
|
||||
static int hashcmpfun(
|
||||
void* v1,
|
||||
void* v2)
|
||||
void* v1,
|
||||
void* v2)
|
||||
{
|
||||
char* i1 = (char*) v1;
|
||||
char* i2 = (char*) v2;
|
||||
@ -374,6 +374,15 @@ ROUTER_OBJECT* GetModuleObject()
|
||||
return &MyObject;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Refresh the instance by hte given parameter value.
|
||||
*
|
||||
* @param router Router instance
|
||||
* @param singleparam Parameter fo be reloaded
|
||||
*
|
||||
* Note: this part is not done. Needs refactoring.
|
||||
*/
|
||||
static void refreshInstance(
|
||||
ROUTER_INSTANCE* router,
|
||||
CONFIG_PARAMETER* singleparam)
|
||||
@ -987,6 +996,14 @@ static void closeSession(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When router session is closed, freeSession can be called to free allocated
|
||||
* resources.
|
||||
*
|
||||
* @param router_instance The router instance the session belongs to
|
||||
* @param router_client_session Client session
|
||||
*
|
||||
*/
|
||||
static void freeSession(
|
||||
ROUTER* router_instance,
|
||||
void* router_client_session)
|
||||
@ -1212,25 +1229,27 @@ static bool get_dcb(
|
||||
* backend and update assign it to new candidate if
|
||||
* necessary.
|
||||
*/
|
||||
else if (SERVER_IS_SLAVE(b->backend_server) &&
|
||||
(max_rlag == MAX_RLAG_UNDEFINED ||
|
||||
else if (SERVER_IS_SLAVE(b->backend_server))
|
||||
{
|
||||
if (max_rlag == MAX_RLAG_UNDEFINED ||
|
||||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
||||
b->backend_server->rlag <= max_rlag)))
|
||||
{
|
||||
candidate_bref = check_candidate_bref(
|
||||
candidate_bref,
|
||||
&backend_ref[i],
|
||||
rses->rses_config.rw_slave_select_criteria);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOGIF(LT, (skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"Server %s:%d is too much behind the "
|
||||
"master, %d s. and can't be chosen.",
|
||||
b->backend_server->name,
|
||||
b->backend_server->port,
|
||||
b->backend_server->rlag)));
|
||||
b->backend_server->rlag <= max_rlag))
|
||||
{
|
||||
candidate_bref = check_candidate_bref(
|
||||
candidate_bref,
|
||||
&backend_ref[i],
|
||||
rses->rses_config.rw_slave_select_criteria);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOGIF(LT, (skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"Server %s:%d is too much behind the "
|
||||
"master, %d s. and can't be chosen.",
|
||||
b->backend_server->name,
|
||||
b->backend_server->port,
|
||||
b->backend_server->rlag)));
|
||||
}
|
||||
}
|
||||
} /*< for */
|
||||
/** Assign selected DCB's pointer value */
|
||||
@ -1474,7 +1493,7 @@ static route_target_t get_route_target (
|
||||
QUERY_IS_TYPE(qtype, QUERY_TYPE_UNKNOWN)));
|
||||
target = TARGET_MASTER;
|
||||
}
|
||||
#if defined(SS_DEBUG)
|
||||
#if defined(SS_EXTRA_DEBUG)
|
||||
LOGIF(LT, (skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"Selected target \"%s\"",
|
||||
@ -1643,10 +1662,10 @@ skygw_query_type_t is_read_tmp_table(
|
||||
* @param type The type of the query resolved so far
|
||||
*/
|
||||
void check_create_tmp_table(
|
||||
ROUTER* instance,
|
||||
void* router_session,
|
||||
GWBUF* querybuf,
|
||||
skygw_query_type_t type)
|
||||
ROUTER* instance,
|
||||
void* router_session,
|
||||
GWBUF* querybuf,
|
||||
skygw_query_type_t type)
|
||||
{
|
||||
|
||||
int klen = 0;
|
||||
@ -2132,14 +2151,14 @@ static int routeQuery(
|
||||
rlag_max);
|
||||
if (succp)
|
||||
{
|
||||
#if defined(SS_DEBUG)
|
||||
#if defined(SS_EXTRA_DEBUG)
|
||||
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
|
||||
"Found DCB for slave.")));
|
||||
#endif
|
||||
ss_dassert(get_bref_from_dcb(router_cli_ses, target_dcb) !=
|
||||
router_cli_ses->rses_master_ref);
|
||||
ss_dassert(get_root_master_bref(router_cli_ses) ==
|
||||
router_cli_ses->rses_master_ref);
|
||||
#endif
|
||||
atomic_add(&inst->stats.n_slave, 1);
|
||||
}
|
||||
else
|
||||
@ -2204,7 +2223,7 @@ static int routeQuery(
|
||||
|
||||
LOGIF(LT, (skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"Route query to %s\t%s:%d <",
|
||||
"Route query to %s \t%s:%d <",
|
||||
(SERVER_IS_MASTER(bref->bref_backend->backend_server) ?
|
||||
"master" : "slave"),
|
||||
bref->bref_backend->backend_server->name,
|
||||
@ -3946,12 +3965,12 @@ static bool route_session_write(
|
||||
{
|
||||
LOGIF(LT, (skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"Route query to %s\t%s:%d%s",
|
||||
"Route query to %s \t%s:%d%s",
|
||||
(SERVER_IS_MASTER(backend_ref[i].bref_backend->backend_server) ?
|
||||
"master" : "slave"),
|
||||
backend_ref[i].bref_backend->backend_server->name,
|
||||
backend_ref[i].bref_backend->backend_server->port,
|
||||
(i+1==router_cli_ses->rses_nbackends ? " <" : ""))));
|
||||
(i+1==router_cli_ses->rses_nbackends ? " <" : " "))));
|
||||
}
|
||||
|
||||
if (BREF_IS_IN_USE((&backend_ref[i])))
|
||||
@ -4001,12 +4020,12 @@ static bool route_session_write(
|
||||
{
|
||||
LOGIF(LT, (skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"Route query to %s\t%s:%d%s",
|
||||
"Route query to %s \t%s:%d%s",
|
||||
(SERVER_IS_MASTER(backend_ref[i].bref_backend->backend_server) ?
|
||||
"master" : "slave"),
|
||||
backend_ref[i].bref_backend->backend_server->name,
|
||||
backend_ref[i].bref_backend->backend_server->port,
|
||||
(i+1==router_cli_ses->rses_nbackends ? " <" : ""))));
|
||||
(i+1==router_cli_ses->rses_nbackends ? " <" : " "))));
|
||||
}
|
||||
|
||||
scur = backend_ref_get_sescmd_cursor(&backend_ref[i]);
|
||||
|
@ -1,4 +1,4 @@
|
||||
#!/bin/sh
|
||||
#!/bin/bash
|
||||
NARGS=7
|
||||
TLOG=$1
|
||||
THOST=$2
|
||||
|
Reference in New Issue
Block a user