|
|
|
|
@ -55,21 +55,57 @@
|
|
|
|
|
#include <skygw_utils.h>
|
|
|
|
|
#include <log_manager.h>
|
|
|
|
|
|
|
|
|
|
#include <rdtsc.h>
|
|
|
|
|
|
|
|
|
|
/* Temporary requirement for auth data */
|
|
|
|
|
#include <mysql_client_server_protocol.h>
|
|
|
|
|
|
|
|
|
|
#define SAMPLE_COUNT 10000
|
|
|
|
|
CYCLES samples[10][SAMPLE_COUNT];
|
|
|
|
|
int sample_index[10] = { 0, 0, 0 };
|
|
|
|
|
|
|
|
|
|
#define LOGD_SLAVE_CATCHUP1 0
|
|
|
|
|
#define LOGD_SLAVE_CATCHUP2 1
|
|
|
|
|
#define LOGD_DISTRIBUTE 2
|
|
|
|
|
#define LOGD_FILE_FLUSH 3
|
|
|
|
|
|
|
|
|
|
SPINLOCK logspin = SPINLOCK_INIT;
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
log_duration(int sample, CYCLES duration)
|
|
|
|
|
{
|
|
|
|
|
char fname[100];
|
|
|
|
|
int i;
|
|
|
|
|
FILE *fp;
|
|
|
|
|
|
|
|
|
|
spinlock_acquire(&logspin);
|
|
|
|
|
samples[sample][sample_index[sample]++] = duration;
|
|
|
|
|
if (sample_index[sample] == SAMPLE_COUNT)
|
|
|
|
|
{
|
|
|
|
|
sprintf(fname, "binlog_profile.%d", sample);
|
|
|
|
|
if ((fp = fopen(fname, "a")) != NULL)
|
|
|
|
|
{
|
|
|
|
|
for (i = 0; i < SAMPLE_COUNT; i++)
|
|
|
|
|
fprintf(fp, "%ld\n", samples[sample][i]);
|
|
|
|
|
fclose(fp);
|
|
|
|
|
}
|
|
|
|
|
sample_index[sample] = 0;
|
|
|
|
|
}
|
|
|
|
|
spinlock_release(&logspin);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
extern int lm_enabled_logfiles_bitmask;
|
|
|
|
|
|
|
|
|
|
static GWBUF *blr_make_query(char *statement);
|
|
|
|
|
static GWBUF *blr_make_registration(ROUTER_INSTANCE *router);
|
|
|
|
|
static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router);
|
|
|
|
|
static void encode_value(unsigned char *data, unsigned int value, int len);
|
|
|
|
|
static void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
|
|
|
|
|
void encode_value(unsigned char *data, unsigned int value, int len);
|
|
|
|
|
void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
|
|
|
|
|
static void blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr);
|
|
|
|
|
static void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
|
|
|
|
|
void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
|
|
|
|
|
static void *CreateMySQLAuthData(char *username, char *password, char *database);
|
|
|
|
|
static void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
|
|
|
|
|
static uint32_t extract_field(uint8_t *src, int bits);
|
|
|
|
|
void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
|
|
|
|
|
inline uint32_t extract_field(uint8_t *src, int bits);
|
|
|
|
|
static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len);
|
|
|
|
|
|
|
|
|
|
static int keepalive = 1;
|
|
|
|
|
@ -460,7 +496,7 @@ int len = 0x1b;
|
|
|
|
|
* @param value The value to pack
|
|
|
|
|
* @param len Number of bits to encode value into
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
void
|
|
|
|
|
encode_value(unsigned char *data, unsigned int value, int len)
|
|
|
|
|
{
|
|
|
|
|
while (len > 0)
|
|
|
|
|
@ -478,7 +514,7 @@ encode_value(unsigned char *data, unsigned int value, int len)
|
|
|
|
|
* @param router The router instance
|
|
|
|
|
* @param pkt The binlog records
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
void
|
|
|
|
|
blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
|
|
|
|
{
|
|
|
|
|
uint8_t *msg = NULL, *ptr, *pdata;
|
|
|
|
|
@ -766,7 +802,9 @@ static REP_HEADER phdr;
|
|
|
|
|
{
|
|
|
|
|
ss_dassert(pkt_length == 0);
|
|
|
|
|
}
|
|
|
|
|
{ CYCLES start = rdtsc();
|
|
|
|
|
blr_file_flush(router);
|
|
|
|
|
log_duration(LOGD_FILE_FLUSH, rdtsc() - start); }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
@ -775,7 +813,7 @@ static REP_HEADER phdr;
|
|
|
|
|
* @param pkt The incoming packet in a GWBUF chain
|
|
|
|
|
* @param hdr The packet header to populate
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
void
|
|
|
|
|
blr_extract_header(uint8_t *ptr, REP_HEADER *hdr)
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
@ -796,10 +834,10 @@ blr_extract_header(uint8_t *ptr, REP_HEADER *hdr)
|
|
|
|
|
* @param src The raw packet source
|
|
|
|
|
* @param birs The number of bits to extract (multiple of 8)
|
|
|
|
|
*/
|
|
|
|
|
static uint32_t
|
|
|
|
|
extract_field(uint8_t *src, int bits)
|
|
|
|
|
inline uint32_t
|
|
|
|
|
extract_field(register uint8_t *src, int bits)
|
|
|
|
|
{
|
|
|
|
|
uint32_t rval = 0, shift = 0;
|
|
|
|
|
register uint32_t rval = 0, shift = 0;
|
|
|
|
|
|
|
|
|
|
while (bits > 0)
|
|
|
|
|
{
|
|
|
|
|
@ -884,14 +922,16 @@ MYSQL_session *auth_info;
|
|
|
|
|
* @param hdr The replication event header
|
|
|
|
|
* @param ptr The raw replication event data
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
void
|
|
|
|
|
blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
|
|
|
|
{
|
|
|
|
|
GWBUF *pkt;
|
|
|
|
|
uint8_t *buf;
|
|
|
|
|
ROUTER_SLAVE *slave;
|
|
|
|
|
int action;
|
|
|
|
|
CYCLES entry;
|
|
|
|
|
|
|
|
|
|
entry = rdtsc();
|
|
|
|
|
spinlock_acquire(&router->lock);
|
|
|
|
|
slave = router->slaves;
|
|
|
|
|
while (slave)
|
|
|
|
|
@ -945,12 +985,16 @@ int action;
|
|
|
|
|
spinlock_acquire(&slave->catch_lock);
|
|
|
|
|
if (slave->overrun)
|
|
|
|
|
{
|
|
|
|
|
CYCLES cycle_start, cycles;
|
|
|
|
|
slave->stats.n_overrun++;
|
|
|
|
|
slave->overrun = 0;
|
|
|
|
|
spinlock_release(&router->lock);
|
|
|
|
|
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
|
|
|
|
|
spinlock_release(&slave->catch_lock);
|
|
|
|
|
cycle_start = rdtsc();
|
|
|
|
|
blr_slave_catchup(router, slave);
|
|
|
|
|
cycles = rdtsc() - cycle_start;
|
|
|
|
|
log_duration(LOGD_SLAVE_CATCHUP2, cycles);
|
|
|
|
|
spinlock_acquire(&router->lock);
|
|
|
|
|
slave = router->slaves;
|
|
|
|
|
if (slave)
|
|
|
|
|
@ -983,6 +1027,7 @@ int action;
|
|
|
|
|
*/
|
|
|
|
|
if (slave->cstate & CS_UPTODATE)
|
|
|
|
|
{
|
|
|
|
|
CYCLES cycle_start, cycles;
|
|
|
|
|
spinlock_release(&router->lock);
|
|
|
|
|
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
|
|
|
|
|
"Force slave %d into catchup mode %s@%d\n",
|
|
|
|
|
@ -991,7 +1036,10 @@ int action;
|
|
|
|
|
spinlock_acquire(&slave->catch_lock);
|
|
|
|
|
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
|
|
|
|
|
spinlock_release(&slave->catch_lock);
|
|
|
|
|
cycle_start = rdtsc();
|
|
|
|
|
blr_slave_catchup(router, slave);
|
|
|
|
|
cycles = rdtsc() - cycle_start;
|
|
|
|
|
log_duration(LOGD_SLAVE_CATCHUP1, cycles);
|
|
|
|
|
spinlock_acquire(&router->lock);
|
|
|
|
|
slave = router->slaves;
|
|
|
|
|
if (slave)
|
|
|
|
|
@ -1005,6 +1053,7 @@ int action;
|
|
|
|
|
slave = slave->next;
|
|
|
|
|
}
|
|
|
|
|
spinlock_release(&router->lock);
|
|
|
|
|
log_duration(LOGD_DISTRIBUTE, rdtsc() - entry);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void
|
|
|
|
|
|