Develop merge

Develop merge:
this includes the new handling of events larger than 16MBytes
This commit is contained in:
MassimilianoPinto
2016-12-05 16:38:48 +01:00
77 changed files with 1178 additions and 862 deletions

View File

@ -1,8 +1,8 @@
if(BUILD_AVRO)
add_subdirectory(avro)
add_subdirectory(avrorouter)
endif()
if(BUILD_BINLOG)
add_subdirectory(binlog)
add_subdirectory(binlogrouter)
endif()
add_subdirectory(cli)

View File

@ -1,7 +1,7 @@
if(AVRO_FOUND AND JANSSON_FOUND)
include_directories(${AVRO_INCLUDE_DIR})
include_directories(${JANSSON_INCLUDE_DIR})
add_library(avrorouter SHARED avro.c ../binlog/binlog_common.c avro_client.c avro_schema.c avro_rbr.c avro_file.c avro_index.c)
add_library(avrorouter SHARED avro.c ../binlogrouter/binlog_common.c avro_client.c avro_schema.c avro_rbr.c avro_file.c avro_index.c)
set_target_properties(avrorouter PROPERTIES VERSION "1.0.0")
set_target_properties(avrorouter PROPERTIES LINK_FLAGS -Wl,-z,defs)
target_link_libraries(avrorouter maxscale-common ${JANSSON_LIBRARIES} ${AVRO_LIBRARIES} maxavro sqlite3 lzma)

View File

@ -568,7 +568,6 @@ createInstance(SERVICE *service, char **options)
inst->reconnect_pending = 0;
inst->handling_threads = 0;
inst->rotating = 0;
inst->residual = NULL;
inst->slaves = NULL;
inst->next = NULL;
inst->lastEventTimestamp = 0;
@ -1847,7 +1846,7 @@ static void rses_end_locked_router_action(ROUTER_SLAVE *rses)
static uint64_t getCapabilities(void)
{
return RCAP_TYPE_NO_RSESSION;
return RCAP_TYPE_NO_RSESSION | RCAP_TYPE_CONTIGUOUS_OUTPUT;
}
/**
@ -2413,13 +2412,6 @@ destroyInstance(ROUTER *instance)
}
}
/* Discard the queued residual data */
while (inst->residual)
{
inst->residual = gwbuf_consume(inst->residual, GWBUF_LENGTH(inst->residual));
}
inst->residual = NULL;
MXS_INFO("%s is being stopped by MaxScale shudown. Disconnecting from master %s:%d, "
"read up to log %s, pos %lu, transaction safe pos %lu",
inst->service->name,

View File

@ -251,12 +251,9 @@ static const char BLR_DBUSERS_FILE[] = "dbusers";
/** Possible states of an event sent by the master */
enum blr_event_state
{
BLR_EVENT_DONE, /*< No event being processed */
BLR_EVENT_STARTED, /*< The first packet of an event which spans multiple packets
* has been received */
BLR_EVENT_STARTED, /*< The first packet of an event has been received */
BLR_EVENT_ONGOING, /*< Other packets of a multi-packet event are being processed */
BLR_EVENT_COMPLETE /*< A multi-packet event has been successfully processed
* but the router is not yet ready to process another one */
BLR_EVENT_DONE, /*< The complete event was received */
};
/* Master Server configuration struct */
@ -524,19 +521,14 @@ typedef struct router_instance
unsigned int master_state; /*< State of the master FSM */
uint8_t lastEventReceived; /*< Last even received */
uint32_t lastEventTimestamp; /*< Timestamp from last event */
GWBUF *residual; /*< Any residual binlog event */
MASTER_RESPONSES saved_master; /*< Saved master responses */
char *binlogdir; /*< The directory with the binlog files */
SPINLOCK binlog_lock; /*< Lock to control update of the binlog position */
int trx_safe; /*< Detect and handle partial transactions */
int pending_transaction; /*< Pending transaction */
enum blr_event_state master_event_state; /*< Packet read state */
uint32_t stored_checksum; /*< The current value of the checksum */
uint8_t partial_checksum[MYSQL_CHECKSUM_LEN]; /*< The partial value of the checksum
* received from the master */
uint8_t partial_checksum_bytes; /*< How many bytes of the checksum we have read */
uint64_t checksum_size; /*< Data size for the checksum */
REP_HEADER stored_header; /*< Relication header of the event the master is sending */
GWBUF *stored_event; /*< Buffer where partial events are stored */
uint64_t last_safe_pos; /* last committed transaction */
char binlog_name[BINLOG_FNAMELEN + 1];
/*< Name of the current binlog file */

View File

@ -104,8 +104,6 @@ static void blr_extract_header_semisync(uint8_t *pkt, REP_HEADER *hdr);
static int blr_send_semisync_ack (ROUTER_INSTANCE *router, uint64_t pos);
static int blr_get_master_semisync(GWBUF *buf);
int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf);
void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len);
static void blr_terminate_master_replication(ROUTER_INSTANCE *router, uint8_t* ptr, int len);
void blr_notify_all_slaves(ROUTER_INSTANCE *router);
extern bool blr_notify_waiting_slave(ROUTER_SLAVE *slave);
@ -168,13 +166,6 @@ blr_start_master(void* data)
}
router->master_state = BLRM_CONNECTING;
/* Discard the queued residual data */
while (router->residual)
{
router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual));
}
router->residual = NULL;
spinlock_release(&router->lock);
if ((client = dcb_alloc(DCB_ROLE_INTERNAL, NULL)) == NULL)
{
@ -242,13 +233,6 @@ blr_restart_master(ROUTER_INSTANCE *router)
{
dcb_close(router->client);
/* Discard the queued residual data */
while (router->residual)
{
router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual));
}
router->residual = NULL;
/* Now it is safe to unleash other threads on this router instance */
spinlock_acquire(&router->lock);
router->reconnect_pending = 0;
@ -335,6 +319,8 @@ blr_master_close(ROUTER_INSTANCE *router)
dcb_close(router->master);
router->master_state = BLRM_UNCONNECTED;
router->master_event_state = BLR_EVENT_DONE;
gwbuf_free(router->stored_event);
router->stored_event = NULL;
}
/**
@ -987,6 +973,63 @@ encode_value(unsigned char *data, unsigned int value, int len)
}
}
/**
* Check that the stored event checksum matches the calculated checksum
*/
static bool verify_checksum(ROUTER_INSTANCE *router, size_t len, uint8_t *ptr)
{
bool rval = true;
uint32_t offset = MYSQL_HEADER_LEN + 1;
uint32_t size = len - (offset + MYSQL_CHECKSUM_LEN);
uint32_t checksum = crc32(0L, ptr + offset, size);
uint32_t pktsum = EXTRACT32(ptr + offset + size);
if (pktsum != checksum)
{
rval = false;
MXS_ERROR("%s: Checksum error in event from master, "
"binlog %s @ %lu. Closing master connection.",
router->service->name, router->binlog_name,
router->current_pos);
router->stats.n_badcrc++;
}
return rval;
}
/**
* @brief Reset router errors
*
* @param router Router instance
* @param hdr Replication header
*/
static void reset_errors(ROUTER_INSTANCE *router, REP_HEADER *hdr)
{
spinlock_acquire(&router->lock);
/* set mysql errno to 0 */
router->m_errno = 0;
/* Remove error message */
if (router->m_errmsg)
{
MXS_FREE(router->m_errmsg);
}
router->m_errmsg = NULL;
spinlock_release(&router->lock);
#ifdef SHOW_EVENTS
printf("blr: len %lu, event type 0x%02x, flags 0x%04x, "
"event size %d, event timestamp %lu\n",
(unsigned long)len - 4,
hdr->event_type,
hdr->flags,
hdr->event_size,
(unsigned long)hdr->timestamp);
#endif
}
/**
* blr_handle_binlog_record - we have received binlog records from
* the master and we must now work out what to do with them.
@ -997,134 +1040,24 @@ encode_value(unsigned char *data, unsigned int value, int len)
void
blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
{
uint8_t *msg = NULL, *ptr, *pdata;
uint8_t *msg = NULL, *ptr;
REP_HEADER hdr;
unsigned int len = 0, reslen;
unsigned int pkt_length;
int no_residual = 1;
int preslen = -1;
unsigned int len = 0;
int prev_length = -1;
int n_bufs = -1, pn_bufs = -1;
int check_packet_len;
int semisync_bytes;
int semi_sync_send_ack = 0;
/*
* Prepend any residual buffer to the buffer chain we have
* been called with.
*/
if (router->residual)
{
pkt = gwbuf_append(router->residual, pkt);
router->residual = NULL;
no_residual = 0;
}
pkt_length = gwbuf_length(pkt);
/*
* Loop over all the packets while we still have some data
* and the packet length is enough to hold a replication event
* header.
*/
while (pkt && pkt_length > 24)
while (pkt)
{
reslen = GWBUF_LENGTH(pkt);
pdata = GWBUF_DATA(pkt);
if (reslen < 3) // Payload length straddles buffers
{
/* Get the length of the packet from the residual and new packet */
if (reslen >= 3)
{
len = EXTRACT24(pdata);
}
else if (reslen == 2)
{
len = EXTRACT16(pdata);
len |= (*(uint8_t *)GWBUF_DATA(pkt->next) << 16);
}
else if (reslen == 1)
{
len = *pdata;
len |= (EXTRACT16(GWBUF_DATA(pkt->next)) << 8);
}
len += 4; // Allow space for the header
}
else
{
len = EXTRACT24(pdata) + 4;
}
/* len is now the payload length for the packet we are working on */
if (reslen < len && pkt_length >= len)
{
/*
* The message is contained in more than the current
* buffer, however we have the complete messasge in
* this buffer and the chain of remaining buffers.
*
* Allocate a contiguous buffer for the binlog message
* and copy the complete message into this buffer.
*/
int msg_remainder = len;
GWBUF *p = pkt;
if ((msg = MXS_MALLOC(len)) == NULL)
{
break;
}
n_bufs = 0;
ptr = msg;
while (p && msg_remainder > 0)
{
int plen = GWBUF_LENGTH(p);
int n = (msg_remainder > plen ? plen : msg_remainder);
memcpy(ptr, GWBUF_DATA(p), n);
msg_remainder -= n;
ptr += n;
if (msg_remainder > 0)
{
p = p->next;
}
n_bufs++;
}
if (msg_remainder)
{
MXS_ERROR("Expected entire message in buffer "
"chain, but failed to create complete "
"message as expected. %s @ %lu",
router->binlog_name,
router->current_pos);
MXS_FREE(msg);
/* msg = NULL; Not needed unless msg will be referred to again */
break;
}
ptr = msg;
}
else if (reslen < len)
{
/*
* The message is not fully contained in the current
* and we do not have the complete message in the
* buffer chain. Therefore we must stop processing
* until we receive the next buffer.
*/
router->stats.n_residuals++;
MXS_DEBUG("Residual data left after %lu records. %s @ %lu",
router->stats.n_binlogs,
router->binlog_name, router->current_pos);
break;
}
else
{
/*
* The message is fully contained in the current buffer
*/
ptr = pdata;
n_bufs = 1;
}
ptr = GWBUF_DATA(pkt);
len = gw_mysql_get_byte3(ptr);
semisync_bytes = 0;
/*
@ -1135,7 +1068,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
if (len < BINLOG_EVENT_HDR_LEN && router->master_event_state != BLR_EVENT_ONGOING)
{
char *event_msg = "";
char *event_msg = "unknown";
/* Packet is too small to be a binlog event */
if (ptr[4] == 0xfe) /* EOF Packet */
@ -1147,11 +1080,13 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
event_msg = "error";
}
MXS_NOTICE("Non-event message (%s) from master.", event_msg);
pkt = gwbuf_consume(pkt, len);
}
else
{
if (router->master_event_state == BLR_EVENT_DONE)
{
/** This is the start of a new event */
spinlock_acquire(&router->lock);
router->stats.n_binlogs++;
router->stats.n_binlogs_ses++;
@ -1187,20 +1122,16 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
/* Sanity check */
if (hdr.ok == 0)
{
if (hdr.event_size != len - check_packet_len &&
if (hdr.event_size != len - (check_packet_len - MYSQL_HEADER_LEN) &&
(hdr.event_size + (check_packet_len - MYSQL_HEADER_LEN)) < MYSQL_PACKET_LENGTH_MAX)
{
MXS_ERROR("Packet length is %d, but event size is %d, "
"binlog file %s position %lu "
"reslen is %d and preslen is %d, "
"length of previous event %d. %s",
"binlog file %s position %lu, "
"length of previous event %d.",
len, hdr.event_size,
router->binlog_name,
router->current_pos,
reslen, preslen, prev_length,
(prev_length == -1 ?
(no_residual ? "No residual data from previous call" :
"Residual data from previous call") : ""));
prev_length);
blr_log_packet(LOG_ERR, "Packet:", ptr, len);
@ -1216,18 +1147,13 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
break;
}
else if (hdr.event_size + (check_packet_len - MYSQL_HEADER_LEN) >= MYSQL_PACKET_LENGTH_MAX)
{
router->master_event_state = BLR_EVENT_STARTED;
/** Store the header for later use */
memcpy(&router->stored_header, &hdr, sizeof(hdr));
}
/** Prepare the checksum variables for this event */
router->stored_checksum = crc32(0L, NULL, 0);
router->checksum_size = hdr.event_size - MYSQL_CHECKSUM_LEN;
router->partial_checksum_bytes = 0;
/** This is the first (and possibly last) packet of a replication
* event. We store the header in case the event is large and
* it is transmitted over multiple network packets. */
router->master_event_state = BLR_EVENT_STARTED;
memcpy(&router->stored_header, &hdr, sizeof(hdr));
reset_errors(router, &hdr);
}
else
{
@ -1236,159 +1162,96 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
gwbuf_free(pkt);
pkt = NULL;
pkt_length = 0;
break;
}
if (hdr.ok == 0)
{
spinlock_acquire(&router->lock);
/* set mysql errno to 0 */
router->m_errno = 0;
/* Remove error message */
if (router->m_errmsg)
{
MXS_FREE(router->m_errmsg);
}
router->m_errmsg = NULL;
spinlock_release(&router->lock);
#ifdef SHOW_EVENTS
printf("blr @ %lu: len %lu, event type 0x%02x, flags 0x%04x, "
"event size %d, event timestamp %lu, next pos %lu\n",
router->current_pos,
(unsigned long)len - 4,
hdr.event_type,
hdr.flags,
hdr.event_size,
(unsigned long)hdr.timestamp,
(unsigned long)hdr.next_pos);
#endif
}
}
else
{
/** We're processing a multi-packet replication event */
ss_dassert(router->master_event_state == BLR_EVENT_ONGOING);
}
/* pending large event */
if (router->master_event_state != BLR_EVENT_DONE)
/** Gather the event into one big buffer */
GWBUF *part = gwbuf_split(&pkt, len + MYSQL_HEADER_LEN);
if (semisync_bytes)
{
if (len - MYSQL_HEADER_LEN < MYSQL_PACKET_LENGTH_MAX)
/** Consume the two semi-sync bytes */
part = gwbuf_consume(part, semisync_bytes);
}
ss_dassert(router->master_event_state == BLR_EVENT_STARTED ||
router->master_event_state == BLR_EVENT_ONGOING);
if (router->master_event_state == BLR_EVENT_ONGOING)
{
/**
* Consume the network header so that we can append the raw
* event data to the original buffer. This allows both checksum
* calculations and encryption to process it as a contiguous event
*/
part = gwbuf_consume(part, MYSQL_HEADER_LEN);
}
router->stored_event = gwbuf_append(router->stored_event, part);
if (len < MYSQL_PACKET_LENGTH_MAX)
{
/**
* This is either the only packet for the event or the last
* packet in a series for this event. The buffer now contains
* the network header of the first packet (4 bytes) and one OK byte.
* The semi-sync bytes are always consumed at an earlier stage.
*/
ss_dassert(router->master_event_state != BLR_EVENT_DONE);
if (router->master_event_state != BLR_EVENT_STARTED)
{
/** This is the last packet, we can now proceed to distribute
* the event afer it has been written to disk */
ss_dassert(router->master_event_state != BLR_EVENT_COMPLETE);
router->master_event_state = BLR_EVENT_COMPLETE;
/**
* This is not the first packet for this event. We must use
* the stored header.
*/
memcpy(&hdr, &router->stored_header, sizeof(hdr));
}
else
{
/* current partial event is being written to disk file */
uint32_t offset = MYSQL_HEADER_LEN;
uint32_t extra_bytes = MYSQL_HEADER_LEN;
/** Don't write the OK byte into the binlog */
if (router->master_event_state == BLR_EVENT_STARTED)
{
offset = MYSQL_HEADER_LEN + 1;
router->master_event_state = BLR_EVENT_ONGOING;
extra_bytes = MYSQL_HEADER_LEN + 1;
}
ss_dassert(len - extra_bytes - semisync_bytes > 0);
uint32_t bytes_available = len - extra_bytes - semisync_bytes;
if (router->master_chksum)
{
uint32_t size = MXS_MIN(len - extra_bytes - semisync_bytes,
router->checksum_size);
router->stored_checksum = crc32(router->stored_checksum,
ptr + offset,
size);
router->checksum_size -= size;
if (router->checksum_size == 0 && size < bytes_available)
{
extract_checksum(router, ptr + offset + size,
bytes_available - size);
}
}
if (blr_write_data_into_binlog(router, bytes_available,
ptr + offset) == 0)
{
/** Failed to write to the binlog file, destroy the buffer
* chain and close the connection with the master */
while (pkt)
{
pkt = GWBUF_CONSUME_ALL(pkt);
}
blr_master_close(router);
blr_master_delayed_connect(router);
return;
}
pkt = gwbuf_consume(pkt, len);
pkt_length -= len;
continue;
}
/** The event is now complete */
router->master_event_state = BLR_EVENT_DONE;
}
else
{
/**
* This packet is a part of a series of packets that contain an
* event larger than MYSQL_PACKET_LENGTH_MAX bytes.
*
* For each partial event chunk, we remove the network header and
* append it to router->stored_event. The first event is an
* exception to this and it is appended as-is with the network
* header and the extra OK byte.
*/
ss_dassert(len == MYSQL_PACKET_LENGTH_MAX);
router->master_event_state = BLR_EVENT_ONGOING;
continue;
}
/*
* First check that the checksum we calculate matches the
* checksum in the packet we received.
/** We now have the complete event in one contiguous buffer */
router->stored_event = gwbuf_make_contiguous(router->stored_event);
ptr = GWBUF_DATA(router->stored_event);
/** len is now the length of the complete event plus 4 bytes of network
* header and one OK byte. Semi-sync bytes are never stored. */
len = gwbuf_length(router->stored_event);
/**
* If checksums are enabled, verify that the stored checksum
* matches the one we calculated
*/
if (router->master_chksum)
if (router->master_chksum && !verify_checksum(router, len, ptr))
{
uint32_t offset = MYSQL_HEADER_LEN;
uint32_t size = len - MYSQL_HEADER_LEN - MYSQL_CHECKSUM_LEN;
if (router->master_event_state == BLR_EVENT_DONE)
{
/** Set the pointer offset to the first byte after
* the header and OK byte */
offset = MYSQL_HEADER_LEN + 1;
size = len - (check_packet_len + MYSQL_CHECKSUM_LEN);
}
size = MXS_MIN(size, router->checksum_size);
if (router->checksum_size > 0)
{
router->stored_checksum = crc32(router->stored_checksum,
ptr + offset, size);
router->checksum_size -= size;
}
if(router->checksum_size == 0 && size < (len - offset - semisync_bytes))
{
extract_checksum(router, ptr + offset + size,
len - offset - size - semisync_bytes);
}
if (router->partial_checksum_bytes == MYSQL_CHECKSUM_LEN)
{
uint32_t pktsum = EXTRACT32(router->partial_checksum);
if (pktsum != router->stored_checksum)
{
router->stats.n_badcrc++;
MXS_FREE(msg);
/* msg = NULL; Not needed unless msg will be referred to again */
MXS_ERROR("%s: Checksum error in event from master, "
"binlog %s @ %lu. Closing master connection.",
router->service->name, router->binlog_name,
router->current_pos);
blr_master_close(router);
blr_master_delayed_connect(router);
return;
}
}
else
{
pkt = gwbuf_consume(pkt, len);
pkt_length -= len;
continue;
}
MXS_FREE(msg);
blr_master_close(router);
blr_master_delayed_connect(router);
return;
}
if (hdr.ok == 0)
@ -1583,16 +1446,6 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
}
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
{
ptr = ptr + 4; // Skip header
uint32_t offset = 4;
if (router->master_event_state == BLR_EVENT_STARTED ||
router->master_event_state == BLR_EVENT_DONE)
{
ptr++;
offset++;
}
if (hdr.event_type == ROTATE_EVENT)
{
spinlock_acquire(&router->binlog_lock);
@ -1600,54 +1453,36 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
spinlock_release(&router->binlog_lock);
}
/* Current event is being written to disk file.
* It is possible for an empty packet to be sent if an
* event is exactly 2^24 bytes long. In this case the
* empty packet should be discarded. */
if (len > MYSQL_HEADER_LEN &&
blr_write_binlog_record(router, &hdr, len - offset - semisync_bytes, ptr) == 0)
uint32_t offset = MYSQL_HEADER_LEN + 1; // Skip header and OK byte
/**
* Write the raw event data to disk without the network
* header or the OK byte
*/
if (blr_write_binlog_record(router, &hdr, len - offset, ptr + offset) == 0)
{
/*
* Failed to write to the
* binlog file, destroy the
* buffer chain and close the
* connection with the master
*/
while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL)
{
;
}
gwbuf_free(pkt);
blr_master_close(router);
blr_master_delayed_connect(router);
return;
}
/* Check for rotete event */
/* Check for rotate event */
if (hdr.event_type == ROTATE_EVENT)
{
if (!blr_rotate_event(router, ptr, &hdr))
{
/*
* Failed to write to the
* binlog file, destroy the
* buffer chain and close the
* connection with the master
*/
while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL)
{
;
}
gwbuf_free(pkt);
blr_master_close(router);
blr_master_delayed_connect(router);
return;
}
}
/* Handle semi-sync request fom master */
/* Handle semi-sync request from master */
if (router->master_semi_sync != MASTER_SEMISYNC_NOT_AVAILABLE &&
semi_sync_send_ack == BLR_MASTER_SEMI_SYNC_ACK_REQ &&
(router->master_event_state == BLR_EVENT_COMPLETE ||
router->master_event_state == BLR_EVENT_DONE))
(router->master_event_state == BLR_EVENT_DONE))
{
MXS_DEBUG("%s: binlog record in file %s, pos %lu has "
@ -1733,16 +1568,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
spinlock_release(&router->binlog_lock);
if (!blr_rotate_event(router, ptr, &hdr))
{
/*
* Failed to write to the
* binlog file, destroy the
* buffer chain and close the
* connection with the master
*/
while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL)
{
;
}
gwbuf_free(pkt);
blr_master_close(router);
blr_master_delayed_connect(router);
return;
@ -1750,17 +1576,15 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
}
}
}
/** A large event is now fully received and processed */
if (router->master_event_state == BLR_EVENT_COMPLETE)
{
router->master_event_state = BLR_EVENT_DONE;
}
}
else
{
blr_terminate_master_replication(router, ptr, len);
}
/** Finished processing the event */
gwbuf_free(router->stored_event);
router->stored_event = NULL;
}
if (msg)
@ -1768,33 +1592,8 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
MXS_FREE(msg);
msg = NULL;
}
prev_length = len;
while (len > 0)
{
unsigned int n, plen;
plen = GWBUF_LENGTH(pkt);
n = (plen < len ? plen : len);
pkt = gwbuf_consume(pkt, n);
len -= n;
pkt_length -= n;
}
preslen = reslen;
pn_bufs = n_bufs;
}
/*
* Check if we have a residual, part binlog message to deal with.
* Just simply store the GWBUF for next time
*/
if (pkt)
{
router->residual = pkt;
ss_dassert(pkt_length != 0);
}
else
{
ss_dassert(pkt_length == 0);
}
blr_file_flush(router);
}
@ -2242,13 +2041,6 @@ blr_stop_start_master(ROUTER_INSTANCE *router)
}
}
/* Discard the queued residual data */
while (router->residual)
{
router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual));
}
router->residual = NULL;
router->master_state = BLRM_UNCONNECTED;
spinlock_release(&router->lock);
@ -2598,26 +2390,6 @@ bool blr_send_event(blr_thread_role_t role,
return rval;
}
/**
* Extract the checksum from the binlogs
*
* This updates the internal state of the router and will allow us to detect
* if the checksum is split across two packets.
* @param router Router instance
* @param cksumptr Pointer to the checksum
* @param len How much of the data is readable
*/
void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len)
{
uint8_t *ptr = cksumptr;
while (ptr - cksumptr < len && router->partial_checksum_bytes < MYSQL_CHECKSUM_LEN)
{
router->partial_checksum[router->partial_checksum_bytes] = *ptr;
ptr++;
router->partial_checksum_bytes++;
}
}
/**
* Stop the slave connection and log errors
*

View File

@ -348,6 +348,7 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
int query_len;
char *ptr;
extern char *strcasestr();
bool unexpected = true;
qtext = (char*)GWBUF_DATA(queue);
query_len = extract_field((uint8_t *)qtext, 24) - 1;
@ -534,6 +535,10 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
return blr_slave_send_var_value(router, slave, heading, server_id, BLR_TYPE_INT);
}
else if (strcasestr(word, "binlog_gtid_pos"))
{
unexpected = false;
}
}
else if (strcasecmp(word, "SHOW") == 0)
{
@ -1122,7 +1127,17 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
MXS_FREE(query_text);
query_text = strndup(qtext, query_len);
MXS_ERROR("Unexpected query from '%s'@'%s': %s", slave->dcb->user, slave->dcb->remote, query_text);
if (unexpected)
{
MXS_ERROR("Unexpected query from '%s'@'%s': %s", slave->dcb->user, slave->dcb->remote, query_text);
}
else
{
MXS_INFO("Unexpected query from '%s'@'%s', possibly a 10.1 slave: %s",
slave->dcb->user, slave->dcb->remote, query_text);
}
MXS_FREE(query_text);
blr_slave_send_error(router, slave,
"You have an error in your SQL syntax; Check the syntax "
@ -3444,13 +3459,6 @@ blr_stop_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
}
}
/* Discard the queued residual data */
while (router->residual)
{
router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual));
}
router->residual = NULL;
/* Now it is safe to unleash other threads on this router instance */
router->reconnect_pending = 0;
router->active_logs = 0;

View File

@ -290,8 +290,8 @@ bool listfuncs_cb(const MODULECMD *cmd, void *data)
{
DCB *dcb = (DCB*)data;
dcb_printf(dcb, "%s::%s(", cmd->domain, cmd->identifier);
dcb_printf(dcb, "Command: %s %s\n", cmd->domain, cmd->identifier);
dcb_printf(dcb, "Parameters: ");
for (int i = 0; i < cmd->arg_count_max; i++)
{
@ -309,8 +309,7 @@ bool listfuncs_cb(const MODULECMD *cmd, void *data)
}
}
dcb_printf(dcb, ")\n");
dcb_printf(dcb, "\n\n");
for (int i = 0; i < cmd->arg_count_max; i++)
{
@ -334,9 +333,9 @@ bool listfuncs_cb(const MODULECMD *cmd, void *data)
return true;
}
void dListFunctions(DCB *dcb)
void dListCommands(DCB *dcb, const char *domain, const char *ident)
{
modulecmd_foreach(NULL, NULL, listfuncs_cb, dcb);
modulecmd_foreach(domain, ident, listfuncs_cb, dcb);
}
/**
@ -405,10 +404,13 @@ struct subcommand listoptions[] =
{0, 0, 0}
},
{
"functions", 0, 0, dListFunctions,
"List registered functions",
"List all registered functions",
{0}
"commands", 0, 2, dListCommands,
"List registered commands",
"Usage list commands [DOMAIN] [COMMAND]\n"
"Parameters:\n"
"DOMAIN Regular expressions for filtering module domains\n"
"COMMAND Regular expressions for filtering module commands\n",
{ARG_TYPE_STRING, ARG_TYPE_STRING}
},
{ EMPTY_OPTION}
};
@ -1081,6 +1083,22 @@ static void createListener(DCB *dcb, SERVICE *service, char *name, char *address
}
}
static void createMonitor(DCB *dcb, const char *name, const char *module)
{
if (monitor_find(name))
{
dcb_printf(dcb, "Monitor '%s' already exists\n", name);
}
else if (runtime_create_monitor(name, module))
{
dcb_printf(dcb, "Created monitor '%s'\n", name);
}
else
{
dcb_printf(dcb, "Failed to create monitor '%s', see log for more details\n", name);
}
}
struct subcommand createoptions[] =
{
{
@ -1127,6 +1145,16 @@ struct subcommand createoptions[] =
ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING,
}
},
{
"monitor", 2, 2, createMonitor,
"Create a new monitor",
"Usage: create monitor NAME MODULE\n"
"NAME Monitor name\n"
"MODULE Monitor module\n",
{
ARG_TYPE_STRING, ARG_TYPE_STRING
}
},
{
EMPTY_OPTION
}
@ -1162,6 +1190,22 @@ static void destroyListener(DCB *dcb, SERVICE *service, const char *name)
}
}
static void destroyMonitor(DCB *dcb, MONITOR *monitor)
{
char name[strlen(monitor->name) + 1];
strcpy(name, monitor->name);
if (runtime_destroy_monitor(monitor))
{
dcb_printf(dcb, "Destroyed monitor '%s'\n", name);
}
else
{
dcb_printf(dcb, "Failed to destroy monitor '%s', see log file for more details\n", name);
}
}
struct subcommand destroyoptions[] =
{
{
@ -1176,6 +1220,12 @@ struct subcommand destroyoptions[] =
"Usage: destroy listener SERVICE NAME",
{ARG_TYPE_SERVICE, ARG_TYPE_STRING}
},
{
"monitor", 1, 1, destroyMonitor,
"Destroy a monitor",
"Usage: destroy monitor NAME",
{ARG_TYPE_MONITOR}
},
{
EMPTY_OPTION
}
@ -1290,6 +1340,13 @@ static void alterMonitor(DCB *dcb, MONITOR *monitor, char *v1, char *v2, char *v
{
dcb_printf(dcb, "Error: Bad key-value parameter: %s=%s\n", key, value);
}
else if (!monitor->created_online)
{
dcb_printf(dcb, "Warning: Altered monitor '%s' which is in the "
"main\nconfiguration file. These changes will not be "
"persisted and need\nto be manually added or set again"
"after a restart.\n", monitor->name);
}
}
else
{
@ -1316,9 +1373,8 @@ struct subcommand alteroptions[] =
"monitor", 2, 12, alterMonitor,
"Alter monitor parameters",
"Usage: alter monitor NAME KEY=VALUE ...\n"
"This will alter an existing parameter of a monitor. The accepted values\n"
"for KEY are: 'user', 'password', 'monitor_interval',\n"
"'backend_connect_timeout', 'backend_write_timeout', 'backend_read_timeout'\n"
"This will alter an existing parameter of a monitor. To remove parameters,\n"
"pass an empty value for a key e.g. 'maxadmin alter monitor my-monitor my-key='\n"
"A maximum of 11 parameters can be changed at one time",
{ARG_TYPE_MONITOR, ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING,
ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING,
@ -1335,9 +1391,9 @@ static inline bool requires_output_dcb(const MODULECMD *cmd)
return cmd->arg_count_max > 0 && MODULECMD_GET_TYPE(type) == MODULECMD_ARG_OUTPUT;
}
static void callFunction(DCB *dcb, char *domain, char *id, char *v3,
char *v4, char *v5, char *v6, char *v7, char *v8, char *v9,
char *v10, char *v11, char *v12)
static void callModuleCommand(DCB *dcb, char *domain, char *id, char *v3,
char *v4, char *v5, char *v6, char *v7, char *v8, char *v9,
char *v10, char *v11, char *v12)
{
const void *values[11] = {v3, v4, v5, v6, v7, v8, v9, v10, v11, v12};
const int valuelen = sizeof(values) / sizeof(values[0]);
@ -1354,7 +1410,7 @@ static void callFunction(DCB *dcb, char *domain, char *id, char *v3,
{
if (requires_output_dcb(cmd))
{
/** The function requires a DCB for output, add the client DCB
/** The command requires a DCB for output, add the client DCB
* as the first argument */
for (int i = valuelen - 1; i > 0; i--)
{
@ -1370,28 +1426,28 @@ static void callFunction(DCB *dcb, char *domain, char *id, char *v3,
{
if (!modulecmd_call_command(cmd, arg))
{
dcb_printf(dcb, "Failed to call function: %s\n", modulecmd_get_error());
dcb_printf(dcb, "Error: %s\n", modulecmd_get_error());
}
modulecmd_arg_free(arg);
}
else
{
dcb_printf(dcb, "Failed to parse arguments: %s\n", modulecmd_get_error());
dcb_printf(dcb, "Error: %s\n", modulecmd_get_error());
}
}
else
{
dcb_printf(dcb, "Function not found: %s\n", modulecmd_get_error());
dcb_printf(dcb, "Error: %s\n", modulecmd_get_error());
}
}
struct subcommand calloptions[] =
{
{
"function", 2, 12, callFunction,
"Call module function",
"Usage: call function NAMESPACE FUNCTION ARGS...\n"
"To list all registered functions, run 'list functions'.\n",
"command", 2, 12, callModuleCommand,
"Call module command",
"Usage: call command NAMESPACE COMMAND ARGS...\n"
"To list all registered commands, run 'list commands'.\n",
{ ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING,
ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING,
ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING}
@ -1703,6 +1759,7 @@ execute_cmd(CLI_SESSION *cli)
else
{
unsigned long arg_list[MAXARGS] = {};
bool ok = true;
for (int k = 0; k < cmds[i].options[j].argc_max && k < argc; k++)
{
@ -1710,72 +1767,75 @@ execute_cmd(CLI_SESSION *cli)
if (arg_list[k] == 0)
{
dcb_printf(dcb, "Invalid argument: %s\n", args[k + 2]);
break;
ok = false;
}
}
switch (cmds[i].options[j].argc_max)
if (ok)
{
case 0:
cmds[i].options[j].fn(dcb);
break;
case 1:
cmds[i].options[j].fn(dcb, arg_list[0]);
break;
case 2:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1]);
break;
case 3:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2]);
break;
case 4:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3]);
break;
case 5:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3], arg_list[4]);
break;
case 6:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3], arg_list[4], arg_list[5]);
break;
case 7:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3], arg_list[4], arg_list[5],
arg_list[6]);
break;
case 8:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3], arg_list[4], arg_list[5],
arg_list[6], arg_list[7]);
break;
case 9:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3], arg_list[4], arg_list[5],
arg_list[6], arg_list[7], arg_list[8]);
break;
case 10:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3], arg_list[4], arg_list[5],
arg_list[6], arg_list[7], arg_list[8],
arg_list[9]);
break;
case 11:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3], arg_list[4], arg_list[5],
arg_list[6], arg_list[7], arg_list[8],
arg_list[9], arg_list[10]);
break;
case 12:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3], arg_list[4], arg_list[5],
arg_list[6], arg_list[7], arg_list[8],
arg_list[9], arg_list[10], arg_list[11]);
break;
default:
dcb_printf(dcb, "Error: Maximum argument count is %d.\n", MAXARGS);
break;
switch (cmds[i].options[j].argc_max)
{
case 0:
cmds[i].options[j].fn(dcb);
break;
case 1:
cmds[i].options[j].fn(dcb, arg_list[0]);
break;
case 2:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1]);
break;
case 3:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2]);
break;
case 4:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3]);
break;
case 5:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3], arg_list[4]);
break;
case 6:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3], arg_list[4], arg_list[5]);
break;
case 7:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3], arg_list[4], arg_list[5],
arg_list[6]);
break;
case 8:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3], arg_list[4], arg_list[5],
arg_list[6], arg_list[7]);
break;
case 9:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3], arg_list[4], arg_list[5],
arg_list[6], arg_list[7], arg_list[8]);
break;
case 10:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3], arg_list[4], arg_list[5],
arg_list[6], arg_list[7], arg_list[8],
arg_list[9]);
break;
case 11:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3], arg_list[4], arg_list[5],
arg_list[6], arg_list[7], arg_list[8],
arg_list[9], arg_list[10]);
break;
case 12:
cmds[i].options[j].fn(dcb, arg_list[0], arg_list[1], arg_list[2],
arg_list[3], arg_list[4], arg_list[5],
arg_list[6], arg_list[7], arg_list[8],
arg_list[9], arg_list[10], arg_list[11]);
break;
default:
dcb_printf(dcb, "Error: Maximum argument count is %d.\n", MAXARGS);
break;
}
}
}
}