Fixes for blr_salve performanc issues, linking of embedded library,
housekeeper heartbeat addition and unloading of modules to allow profiling of modules using LD_PROFILE environment variable and sprof
This commit is contained in:
parent
bce0716861
commit
9186d3fa90
@ -1633,6 +1633,7 @@ int main(int argc, char **argv)
|
||||
LOGFILE_MESSAGE,
|
||||
"MaxScale shutdown completed.")));
|
||||
|
||||
unload_all_modules();
|
||||
/* Remove Pidfile */
|
||||
unlink_pidfile();
|
||||
|
||||
|
@ -43,6 +43,7 @@ static HKTASK *tasks = NULL;
|
||||
static SPINLOCK tasklock = SPINLOCK_INIT;
|
||||
|
||||
static int do_shutdown = 0;
|
||||
unsigned long hkheartbeat = 0;
|
||||
|
||||
static void hkthread(void *);
|
||||
|
||||
@ -171,12 +172,17 @@ HKTASK *ptr;
|
||||
time_t now;
|
||||
void (*taskfn)(void *);
|
||||
void *taskdata;
|
||||
int i;
|
||||
|
||||
for (;;)
|
||||
{
|
||||
if (do_shutdown)
|
||||
return;
|
||||
thread_millisleep(1000);
|
||||
for (i = 0; i < 10; i++)
|
||||
{
|
||||
if (do_shutdown)
|
||||
return;
|
||||
thread_millisleep(100);
|
||||
hkheartbeat++;
|
||||
}
|
||||
now = time(0);
|
||||
spinlock_acquire(&tasklock);
|
||||
ptr = tasks;
|
||||
|
@ -332,6 +332,21 @@ MODULES *ptr;
|
||||
free(mod);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unload all modules
|
||||
*
|
||||
* Remove all the modules from the system, called during shutdown
|
||||
* to allow termination hooks to be called.
|
||||
*/
|
||||
void
|
||||
unload_all_modules()
|
||||
{
|
||||
while (registered)
|
||||
{
|
||||
unregister_module(registered->module);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Print Modules
|
||||
*
|
||||
|
@ -125,6 +125,8 @@ MEMLOG *ptr;
|
||||
void
|
||||
memlog_log(MEMLOG *log, void *value)
|
||||
{
|
||||
if (!log)
|
||||
return;
|
||||
spinlock_acquire(&log->lock);
|
||||
switch (log->type)
|
||||
{
|
||||
|
@ -32,10 +32,14 @@
|
||||
#include <housekeeper.h>
|
||||
#include <mysql.h>
|
||||
|
||||
#define PROFILE_POLL 1
|
||||
#define PROFILE_POLL 0
|
||||
|
||||
#if PROFILE_POLL
|
||||
#include <rdtsc.h>
|
||||
#include <memlog.h>
|
||||
|
||||
extern unsigned long hkheartbeat;
|
||||
MEMLOG *plog;
|
||||
#endif
|
||||
|
||||
extern int lm_enabled_logfiles_bitmask;
|
||||
@ -143,7 +147,9 @@ static struct {
|
||||
int n_fds[MAXNFDS]; /*< Number of wakeups with particular
|
||||
n_fds value */
|
||||
int evq_length; /*< Event queue length */
|
||||
int evq_pending; /*< Number of pending descriptors in event queue */
|
||||
int evq_max; /*< Maximum event queue length */
|
||||
int wake_evqpending;/*< Woken from epoll_wait with pending events in queue */
|
||||
} pollStats;
|
||||
|
||||
/**
|
||||
@ -194,6 +200,9 @@ int i;
|
||||
for (i = 0; i < n_avg_samples; i++)
|
||||
avg_samples[i] = 0.0;
|
||||
|
||||
#if PROFILE_POLL
|
||||
plog = memlog_create("EventQueueWaitTime", ML_LONG, 10000);
|
||||
#endif
|
||||
}
|
||||
|
||||
/**
|
||||
@ -396,6 +405,8 @@ DCB *zombies = NULL;
|
||||
if (thread_data)
|
||||
thread_data[thread_id].state = THREAD_ZPROCESSING;
|
||||
zombies = dcb_process_zombies(thread_id);
|
||||
if (thread_data)
|
||||
thread_data[thread_id].state = THREAD_IDLE;
|
||||
}
|
||||
|
||||
atomic_add(&n_waiting, 1);
|
||||
@ -423,6 +434,7 @@ DCB *zombies = NULL;
|
||||
pthread_self(),
|
||||
nfds,
|
||||
eno)));
|
||||
atomic_add(&n_waiting, -1);
|
||||
}
|
||||
/*
|
||||
* If there are no new descriptors from the non-blocking call
|
||||
@ -431,11 +443,12 @@ DCB *zombies = NULL;
|
||||
*/
|
||||
else if (nfds == 0 && process_pollq(thread_id) == 0)
|
||||
{
|
||||
atomic_add(&n_waiting, 1);
|
||||
nfds = epoll_wait(epoll_fd,
|
||||
events,
|
||||
MAX_EVENTS,
|
||||
EPOLL_TIMEOUT);
|
||||
if (nfds == 0 && pollStats.evq_pending)
|
||||
atomic_add(&pollStats.wake_evqpending, 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -474,7 +487,7 @@ DCB *zombies = NULL;
|
||||
/*
|
||||
* Process every DCB that has a new event and add
|
||||
* it to the poll queue.
|
||||
* If the DCB is currently beign processed then we
|
||||
* If the DCB is currently being processed then we
|
||||
* or in the new eent bits to the pending event bits
|
||||
* and leave it in the queue.
|
||||
* If the DCB was not already in the queue then it was
|
||||
@ -489,6 +502,13 @@ DCB *zombies = NULL;
|
||||
spinlock_acquire(&pollqlock);
|
||||
if (DCB_POLL_BUSY(dcb))
|
||||
{
|
||||
if (dcb->evq.pending_events == 0)
|
||||
{
|
||||
pollStats.evq_pending++;
|
||||
#if PROFILE_POLL
|
||||
dcb->evq.inserted = hkheartbeat;
|
||||
#endif
|
||||
}
|
||||
dcb->evq.pending_events |= ev;
|
||||
}
|
||||
else
|
||||
@ -508,6 +528,10 @@ DCB *zombies = NULL;
|
||||
dcb->evq.next = dcb;
|
||||
}
|
||||
pollStats.evq_length++;
|
||||
pollStats.evq_pending++;
|
||||
#if PROFILE_POLL
|
||||
dcb->evq.inserted = hkheartbeat;
|
||||
#endif
|
||||
if (pollStats.evq_length > pollStats.evq_max)
|
||||
{
|
||||
pollStats.evq_max = pollStats.evq_length;
|
||||
@ -527,6 +551,10 @@ DCB *zombies = NULL;
|
||||
thread_data[thread_id].state = THREAD_ZPROCESSING;
|
||||
}
|
||||
zombies = dcb_process_zombies(thread_id);
|
||||
if (thread_data)
|
||||
{
|
||||
thread_data[thread_id].state = THREAD_IDLE;
|
||||
}
|
||||
}
|
||||
|
||||
if (do_shutdown)
|
||||
@ -608,12 +636,17 @@ uint32_t ev;
|
||||
{
|
||||
ev = dcb->evq.pending_events;
|
||||
dcb->evq.pending_events = 0;
|
||||
pollStats.evq_pending--;
|
||||
}
|
||||
spinlock_release(&pollqlock);
|
||||
|
||||
if (found == 0)
|
||||
return 0;
|
||||
|
||||
#if PROFILE_POLL
|
||||
memlog_log(plog, hkheartbeat - dcb->evq.inserted);
|
||||
#endif
|
||||
|
||||
|
||||
CHK_DCB(dcb);
|
||||
if (thread_data)
|
||||
@ -927,6 +960,10 @@ int i;
|
||||
pollStats.evq_length);
|
||||
dcb_printf(dcb, "Maximum event queue length: %d\n",
|
||||
pollStats.evq_max);
|
||||
dcb_printf(dcb, "Number of DCBs with pending events: %d\n",
|
||||
pollStats.evq_pending);
|
||||
dcb_printf(dcb, "Number of wakeups with pending queue: %d\n",
|
||||
pollStats.wake_evqpending);
|
||||
|
||||
dcb_printf(dcb, "No of poll completions with descriptors\n");
|
||||
dcb_printf(dcb, "\tNo. of descriptors\tNo. of poll completions.\n");
|
||||
@ -1127,6 +1164,13 @@ uint32_t ev = EPOLLOUT;
|
||||
spinlock_acquire(&pollqlock);
|
||||
if (DCB_POLL_BUSY(dcb))
|
||||
{
|
||||
if (dcb->evq.pending_events == 0)
|
||||
{
|
||||
pollStats.evq_pending++;
|
||||
#if PROFILE_POLL
|
||||
dcb->evq.inserted = hkheartbeat;
|
||||
#endif
|
||||
}
|
||||
dcb->evq.pending_events |= ev;
|
||||
}
|
||||
else
|
||||
@ -1146,6 +1190,10 @@ uint32_t ev = EPOLLOUT;
|
||||
dcb->evq.next = dcb;
|
||||
}
|
||||
pollStats.evq_length++;
|
||||
pollStats.evq_pending++;
|
||||
#if PROFILE_POLL
|
||||
dcb->evq.inserted = hkheartbeat;
|
||||
#endif
|
||||
if (pollStats.evq_length > pollStats.evq_max)
|
||||
{
|
||||
pollStats.evq_max = pollStats.evq_length;
|
||||
|
@ -21,7 +21,7 @@ LDFLAGS=-rdynamic -L$(LOGPATH) -L$(EMBEDDED_LIB) \
|
||||
LIBS= -lz -lm -lcrypt -lcrypto -ldl -laio -lrt -pthread -llog_manager \
|
||||
-L../../inih/extra -linih -lssl -lstdc++ -lmysqld
|
||||
|
||||
TESTS=testhash testspinlock testfilter testadminusers
|
||||
TESTS=testhash testspinlock testfilter testadminusers testmemlog
|
||||
|
||||
cleantests:
|
||||
- $(DEL) *.o
|
||||
@ -59,6 +59,13 @@ testadminusers: testadminusers.c libcore.a
|
||||
-I$(ROOT_PATH)/utils \
|
||||
testadminusers.c libcore.a $(UTILSPATH)/skygw_utils.o $(LIBS) -o testadminusers
|
||||
|
||||
testmemlog: testmemlog.c libcore.a
|
||||
$(CC) $(CFLAGS) $(LDFLAGS) \
|
||||
-I$(ROOT_PATH)/server/include \
|
||||
-I$(ROOT_PATH)/utils \
|
||||
testmemlog.c libcore.a $(UTILSPATH)/skygw_utils.o $(LIBS) -o testmemlog
|
||||
|
||||
|
||||
libcore.a: ../*.o
|
||||
ar rv libcore.a ../*.o
|
||||
|
||||
|
@ -98,12 +98,24 @@ typedef struct gw_protocol {
|
||||
int (*session)(struct dcb *, void *);
|
||||
} GWPROTOCOL;
|
||||
|
||||
/**
|
||||
* The event queue structure used in the polling loop to maintain a queue
|
||||
* of events that need to be processed for the DCB.
|
||||
*
|
||||
* next The next DCB in the event queue
|
||||
* prev The previous DCB in the event queue
|
||||
* pending_events The events that are pending processing
|
||||
* processing Flag to indicate the processing status of the DCB
|
||||
* eventqlock Spinlock to protect this structure
|
||||
* inserted Insertion time for logging purposes
|
||||
*/
|
||||
typedef struct {
|
||||
struct dcb *next;
|
||||
struct dcb *prev;
|
||||
uint32_t pending_events;
|
||||
int processing;
|
||||
SPINLOCK eventqlock;
|
||||
unsigned long inserted;
|
||||
} DCBEVENTQ;
|
||||
|
||||
/**
|
||||
|
@ -32,6 +32,8 @@
|
||||
* 13/06/13 Mark Riddoch Initial implementation
|
||||
* 08/07/13 Mark Riddoch Addition of monitor modules
|
||||
* 29/05/14 Mark Riddoch Addition of filter modules
|
||||
* 01/10/14 Mark Riddoch Addition of call to unload all modules on
|
||||
* shutdown
|
||||
* @endverbatim
|
||||
*/
|
||||
|
||||
@ -58,6 +60,7 @@ typedef struct modules {
|
||||
|
||||
extern void *load_module(const char *module, const char *type);
|
||||
extern void unload_module(const char *module);
|
||||
extern void unload_all_modules();
|
||||
extern void printModules();
|
||||
extern void dprintAllModules(DCB *);
|
||||
char* get_maxscale_home(void);
|
||||
|
@ -54,7 +54,7 @@
|
||||
* Default burst sizes for slave catchup
|
||||
*/
|
||||
#define DEF_SHORT_BURST 15
|
||||
#define DEF_LONG_BURST 2000
|
||||
#define DEF_LONG_BURST 500
|
||||
|
||||
/**
|
||||
* Some useful macros for examining the MySQL Response packets
|
||||
@ -366,6 +366,26 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered",
|
||||
#define LOG_EVENT_NO_FILTER_F 0x0100
|
||||
#define LOG_EVENT_MTS_ISOLATE_F 0x0200
|
||||
|
||||
/**
|
||||
* Macros to extract common fields
|
||||
*/
|
||||
#define INLINE_EXTRACT 1 /* Set to 0 for debug purposes */
|
||||
|
||||
#if INLINE_EXTRACT
|
||||
#define EXTRACT16(x) (*(uint8_t *)(x) | (*((uint8_t *)(x) + 1) << 8))
|
||||
#define EXTRACT24(x) (*(uint8_t *)(x) | \
|
||||
(*((uint8_t *)(x) + 1) << 8) | \
|
||||
(*((uint8_t *)(x) + 2) << 16))
|
||||
#define EXTRACT32(x) (*(uint8_t *)(x) | \
|
||||
(*((uint8_t *)(x) + 1) << 8) | \
|
||||
(*((uint8_t *)(x) + 2) << 16) | \
|
||||
(*((uint8_t *)(x) + 3) << 24))
|
||||
#else
|
||||
#define EXTRACT16(x) extract_field((x), 16)
|
||||
#define EXTRACT24(x) extract_field((x), 24)
|
||||
#define EXTRACT32(x) extract_field((x), 32)
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Externals within the router
|
||||
*/
|
||||
|
@ -1,4 +1,4 @@
|
||||
add_library(binlogrouter SHARED blr.c blr_master.c blr_cache.c blr_slave.c blr_file.c)
|
||||
set_target_properties(binlogrouter PROPERTIES INSTALL_RPATH ${CMAKE_INSTALL_RPATH}:${CMAKE_INSTALL_PREFIX}/lib)
|
||||
target_link_libraries(binlogrouter ssl pthread log_manager ${EMBEDDED_LIB})
|
||||
target_link_libraries(binlogrouter ssl pthread log_manager)
|
||||
install(TARGETS binlogrouter DESTINATION modules)
|
||||
|
@ -31,10 +31,14 @@ CFLAGS=-c -fPIC -I/usr/include -I../../include -I../../../include \
|
||||
|
||||
include ../../../../makefile.inc
|
||||
|
||||
#LDFLAGS=-shared -L$(LOGPATH) -L$(QCLASSPATH) -L$(EMBEDDED_LIB) \
|
||||
# -Wl,-rpath,$(DEST)/lib \
|
||||
# -Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH) -Wl,-rpath,$(QCLASSPATH) \
|
||||
# -Wl,-rpath,$(EMBEDDED_LIB)
|
||||
LDFLAGS=-shared -L$(LOGPATH) -L$(QCLASSPATH) -L$(EMBEDDED_LIB) \
|
||||
-Wl,-rpath,$(DEST)/lib \
|
||||
-Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH) -Wl,-rpath,$(QCLASSPATH) \
|
||||
-Wl,-rpath,$(EMBEDDED_LIB)
|
||||
-Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH)
|
||||
|
||||
|
||||
SRCS=blr.c blr_master.c blr_cache.c blr_slave.c blr_file.c
|
||||
OBJ=$(SRCS:.c=.o)
|
||||
|
@ -173,6 +173,8 @@ int i;
|
||||
inst->service = service;
|
||||
spinlock_init(&inst->lock);
|
||||
|
||||
inst->binlog_fd = -1;
|
||||
|
||||
inst->low_water = DEF_LOW_WATER;
|
||||
inst->high_water = DEF_HIGH_WATER;
|
||||
inst->initbinlog = 0;
|
||||
@ -749,7 +751,7 @@ struct tm tm;
|
||||
dcb_printf(dcb, "\tSpinlock statistics (rses_lock):\n");
|
||||
spinlock_stats(&session->rses_lock, spin_reporter, dcb);
|
||||
#endif
|
||||
|
||||
dcb_printf(dcb, "\n");
|
||||
session = session->next;
|
||||
}
|
||||
spinlock_release(&router_inst->lock);
|
||||
|
@ -290,14 +290,25 @@ int n;
|
||||
/* Read the header information from the file */
|
||||
if ((n = read(fd, hdbuf, 19)) != 19)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"Failed to read header for binlog entry, "
|
||||
"at %d (%s).\n", pos, strerror(errno))));
|
||||
if (n> 0 && n < 19)
|
||||
switch (n)
|
||||
{
|
||||
case 0:
|
||||
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
|
||||
"Reached end of binlog file at %d.\n",
|
||||
pos)));
|
||||
break;
|
||||
case -1:
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"Failed to read binlog file at position %d"
|
||||
" (%s).\n", pos, strerror(errno))));
|
||||
break;
|
||||
default:
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"Short read when reading the header. "
|
||||
"Expected 19 bytes got %d bytes.\n",
|
||||
n)));
|
||||
break;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
hdr->timestamp = extract_field(hdbuf, 32);
|
||||
|
@ -515,23 +515,23 @@ static REP_HEADER phdr;
|
||||
/* Get the length of the packet from the residual and new packet */
|
||||
if (reslen >= 3)
|
||||
{
|
||||
len = extract_field(pdata, 24);
|
||||
len = EXTRACT24(pdata);
|
||||
}
|
||||
else if (reslen == 2)
|
||||
{
|
||||
len = extract_field(pdata, 16);
|
||||
len |= (extract_field(GWBUF_DATA(pkt->next), 8) << 16);
|
||||
len = EXTRACT16(pdata);
|
||||
len |= (*(uint8_t *)GWBUF_DATA(pkt->next) << 16);
|
||||
}
|
||||
else if (reslen == 1)
|
||||
{
|
||||
len = extract_field(pdata, 8);
|
||||
len |= (extract_field(GWBUF_DATA(pkt->next), 16) << 8);
|
||||
len = *pdata;
|
||||
len |= (EXTRACT16(GWBUF_DATA(pkt->next)) << 8);
|
||||
}
|
||||
len += 4; // Allow space for the header
|
||||
}
|
||||
else
|
||||
{
|
||||
len = extract_field(pdata, 24) + 4;
|
||||
len = EXTRACT24(pdata) + 4;
|
||||
}
|
||||
|
||||
if (reslen < len && pkt_length >= len)
|
||||
@ -779,18 +779,18 @@ static REP_HEADER phdr;
|
||||
* @param hdr The packet header to populate
|
||||
*/
|
||||
void
|
||||
blr_extract_header(uint8_t *ptr, REP_HEADER *hdr)
|
||||
blr_extract_header(register uint8_t *ptr, register REP_HEADER *hdr)
|
||||
{
|
||||
|
||||
hdr->payload_len = extract_field(ptr, 24);
|
||||
hdr->payload_len = EXTRACT24(ptr);
|
||||
hdr->seqno = ptr[3];
|
||||
hdr->ok = ptr[4];
|
||||
hdr->timestamp = extract_field(&ptr[5], 32);
|
||||
hdr->timestamp = EXTRACT32(&ptr[5]);
|
||||
hdr->event_type = ptr[9];
|
||||
hdr->serverid = extract_field(&ptr[10], 32);
|
||||
hdr->event_size = extract_field(&ptr[14], 32);
|
||||
hdr->next_pos = extract_field(&ptr[18], 32);
|
||||
hdr->flags = extract_field(&ptr[22], 16);
|
||||
hdr->serverid = EXTRACT32(&ptr[10]);
|
||||
hdr->event_size = EXTRACT32(&ptr[14]);
|
||||
hdr->next_pos = EXTRACT32(&ptr[18]);
|
||||
hdr->flags = EXTRACT16(&ptr[22]);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -892,13 +892,18 @@ blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *
|
||||
{
|
||||
GWBUF *pkt;
|
||||
uint8_t *buf;
|
||||
ROUTER_SLAVE *slave;
|
||||
ROUTER_SLAVE *slave, *nextslave;
|
||||
int action;
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
slave = router->slaves;
|
||||
while (slave)
|
||||
{
|
||||
if (slave->state != BLRS_DUMPING)
|
||||
{
|
||||
slave = slave->next;
|
||||
continue;
|
||||
}
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
if ((slave->cstate & (CS_UPTODATE|CS_DIST)) == CS_UPTODATE)
|
||||
{
|
||||
@ -986,6 +991,7 @@ int action;
|
||||
*/
|
||||
if (slave->cstate & CS_UPTODATE)
|
||||
{
|
||||
nextslave = slave->next;
|
||||
spinlock_release(&router->lock);
|
||||
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
|
||||
"Force slave %d into catchup mode %s@%d\n",
|
||||
@ -998,9 +1004,18 @@ int action;
|
||||
spinlock_acquire(&router->lock);
|
||||
slave = router->slaves;
|
||||
if (slave)
|
||||
continue;
|
||||
{
|
||||
while (slave && slave != nextslave)
|
||||
slave = slave->next;
|
||||
if (slave)
|
||||
continue;
|
||||
else
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -473,12 +473,13 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue
|
||||
{
|
||||
GWBUF *resp;
|
||||
uint8_t *ptr;
|
||||
int len, flags, serverid, rval;
|
||||
int len, flags, serverid, rval, binlognamelen;
|
||||
REP_HEADER hdr;
|
||||
uint32_t chksum;
|
||||
|
||||
ptr = GWBUF_DATA(queue);
|
||||
len = extract_field(ptr, 24);
|
||||
binlognamelen = len - 11;
|
||||
ptr += 4; // Skip length and sequence number
|
||||
if (*ptr++ != COM_BINLOG_DUMP)
|
||||
{
|
||||
@ -495,15 +496,16 @@ uint32_t chksum;
|
||||
ptr += 2;
|
||||
serverid = extract_field(ptr, 32);
|
||||
ptr += 4;
|
||||
strncpy(slave->binlogfile, (char *)ptr, BINLOG_FNAMELEN);
|
||||
strncpy(slave->binlogfile, (char *)ptr, binlognamelen);
|
||||
slave->binlogfile[binlognamelen] = 0;
|
||||
|
||||
slave->state = BLRS_DUMPING;
|
||||
slave->seqno = 1;
|
||||
|
||||
if (slave->nocrc)
|
||||
len = 0x2b;
|
||||
len = 19 + 8 + binlognamelen;
|
||||
else
|
||||
len = 0x2f;
|
||||
len = 19 + 8 + 4 + binlognamelen;
|
||||
|
||||
// Build a fake rotate event
|
||||
resp = gwbuf_alloc(len + 5);
|
||||
@ -758,7 +760,8 @@ struct timespec req;
|
||||
slave->cstate &= ~CS_BUSY;
|
||||
spinlock_release(&slave->catch_lock);
|
||||
|
||||
close(fd);
|
||||
if (fd != -1)
|
||||
close(fd);
|
||||
poll_fake_write_event(slave->dcb);
|
||||
if (record)
|
||||
{
|
||||
@ -831,14 +834,19 @@ ROUTER_INSTANCE *router = slave->router;
|
||||
* Rotate the slave to the new binlog file
|
||||
*
|
||||
* @param slave The slave instance
|
||||
* @param ptr The rotate event (minux header and OK byte)
|
||||
* @param ptr The rotate event (minus header and OK byte)
|
||||
*/
|
||||
void
|
||||
blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr)
|
||||
{
|
||||
int len = EXTRACT24(ptr + 9); // Extract the event length
|
||||
|
||||
len = len - (19 + 8 + 4); // Remove length of header, checksum and position
|
||||
if (len > BINLOG_FNAMELEN)
|
||||
len = BINLOG_FNAMELEN;
|
||||
ptr += 19; // Skip header
|
||||
slave->binlog_pos = extract_field(ptr, 32);
|
||||
slave->binlog_pos += (extract_field(ptr+4, 32) << 32);
|
||||
memcpy(slave->binlogfile, ptr + 8, BINLOG_FNAMELEN);
|
||||
slave->binlogfile[BINLOG_FNAMELEN] = 0;
|
||||
memcpy(slave->binlogfile, ptr + 8, len);
|
||||
slave->binlogfile[len] = 0;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user