Use protocol module to process packets
The backend protocol module can be requested to provide complete and contiguous packets to the router module. This removes the need to process the packets in binlogrouter.
This commit is contained in:
@ -1771,7 +1771,7 @@ static void rses_end_locked_router_action(ROUTER_SLAVE *rses)
|
||||
|
||||
static uint64_t getCapabilities(void)
|
||||
{
|
||||
return RCAP_TYPE_NO_RSESSION;
|
||||
return RCAP_TYPE_NO_RSESSION | RCAP_TYPE_CONTIGUOUS_OUTPUT;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -995,12 +995,10 @@ encode_value(unsigned char *data, unsigned int value, int len)
|
||||
void
|
||||
blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
{
|
||||
uint8_t *msg = NULL, *ptr, *pdata;
|
||||
uint8_t *msg = NULL, *ptr;
|
||||
REP_HEADER hdr;
|
||||
unsigned int len = 0, reslen;
|
||||
unsigned int len = 0;
|
||||
unsigned int pkt_length;
|
||||
int no_residual = 1;
|
||||
int preslen = -1;
|
||||
int prev_length = -1;
|
||||
int n_bufs = -1, pn_bufs = -1;
|
||||
int check_packet_len;
|
||||
@ -1015,7 +1013,6 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
{
|
||||
pkt = gwbuf_append(router->residual, pkt);
|
||||
router->residual = NULL;
|
||||
no_residual = 0;
|
||||
}
|
||||
|
||||
pkt_length = gwbuf_length(pkt);
|
||||
@ -1026,103 +1023,8 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
*/
|
||||
while (pkt && pkt_length > 24)
|
||||
{
|
||||
reslen = GWBUF_LENGTH(pkt);
|
||||
pdata = GWBUF_DATA(pkt);
|
||||
if (reslen < 3) // Payload length straddles buffers
|
||||
{
|
||||
/* Get the length of the packet from the residual and new packet */
|
||||
if (reslen >= 3)
|
||||
{
|
||||
len = EXTRACT24(pdata);
|
||||
}
|
||||
else if (reslen == 2)
|
||||
{
|
||||
len = EXTRACT16(pdata);
|
||||
len |= (*(uint8_t *)GWBUF_DATA(pkt->next) << 16);
|
||||
}
|
||||
else if (reslen == 1)
|
||||
{
|
||||
len = *pdata;
|
||||
len |= (EXTRACT16(GWBUF_DATA(pkt->next)) << 8);
|
||||
}
|
||||
len += 4; // Allow space for the header
|
||||
}
|
||||
else
|
||||
{
|
||||
len = EXTRACT24(pdata) + 4;
|
||||
}
|
||||
/* len is now the payload length for the packet we are working on */
|
||||
|
||||
if (reslen < len && pkt_length >= len)
|
||||
{
|
||||
/*
|
||||
* The message is contained in more than the current
|
||||
* buffer, however we have the complete messasge in
|
||||
* this buffer and the chain of remaining buffers.
|
||||
*
|
||||
* Allocate a contiguous buffer for the binlog message
|
||||
* and copy the complete message into this buffer.
|
||||
*/
|
||||
int msg_remainder = len;
|
||||
GWBUF *p = pkt;
|
||||
|
||||
if ((msg = MXS_MALLOC(len)) == NULL)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
n_bufs = 0;
|
||||
ptr = msg;
|
||||
while (p && msg_remainder > 0)
|
||||
{
|
||||
int plen = GWBUF_LENGTH(p);
|
||||
int n = (msg_remainder > plen ? plen : msg_remainder);
|
||||
memcpy(ptr, GWBUF_DATA(p), n);
|
||||
msg_remainder -= n;
|
||||
ptr += n;
|
||||
if (msg_remainder > 0)
|
||||
{
|
||||
p = p->next;
|
||||
}
|
||||
n_bufs++;
|
||||
}
|
||||
if (msg_remainder)
|
||||
{
|
||||
MXS_ERROR("Expected entire message in buffer "
|
||||
"chain, but failed to create complete "
|
||||
"message as expected. %s @ %lu",
|
||||
router->binlog_name,
|
||||
router->current_pos);
|
||||
MXS_FREE(msg);
|
||||
/* msg = NULL; Not needed unless msg will be referred to again */
|
||||
break;
|
||||
}
|
||||
|
||||
ptr = msg;
|
||||
}
|
||||
else if (reslen < len)
|
||||
{
|
||||
/*
|
||||
* The message is not fully contained in the current
|
||||
* and we do not have the complete message in the
|
||||
* buffer chain. Therefore we must stop processing
|
||||
* until we receive the next buffer.
|
||||
*/
|
||||
router->stats.n_residuals++;
|
||||
MXS_DEBUG("Residual data left after %lu records. %s @ %lu",
|
||||
router->stats.n_binlogs,
|
||||
router->binlog_name, router->current_pos);
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* The message is fully contained in the current buffer
|
||||
*/
|
||||
ptr = pdata;
|
||||
n_bufs = 1;
|
||||
}
|
||||
|
||||
ptr = GWBUF_DATA(pkt);
|
||||
len = gw_mysql_get_byte3(ptr);
|
||||
semisync_bytes = 0;
|
||||
|
||||
/*
|
||||
@ -1133,6 +1035,8 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
|
||||
if (len < BINLOG_EVENT_HDR_LEN && router->master_event_state != BLR_EVENT_ONGOING)
|
||||
{
|
||||
// Dead code?
|
||||
|
||||
char *event_msg = "";
|
||||
|
||||
/* Packet is too small to be a binlog event */
|
||||
@ -1189,16 +1093,12 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
(hdr.event_size + (check_packet_len - MYSQL_HEADER_LEN)) < MYSQL_PACKET_LENGTH_MAX)
|
||||
{
|
||||
MXS_ERROR("Packet length is %d, but event size is %d, "
|
||||
"binlog file %s position %lu "
|
||||
"reslen is %d and preslen is %d, "
|
||||
"length of previous event %d. %s",
|
||||
"binlog file %s position %lu, "
|
||||
"length of previous event %d.",
|
||||
len, hdr.event_size,
|
||||
router->binlog_name,
|
||||
router->current_pos,
|
||||
reslen, preslen, prev_length,
|
||||
(prev_length == -1 ?
|
||||
(no_residual ? "No residual data from previous call" :
|
||||
"Residual data from previous call") : ""));
|
||||
prev_length);
|
||||
|
||||
blr_log_packet(LOG_ERR, "Packet:", ptr, len);
|
||||
|
||||
@ -1774,7 +1674,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
len -= n;
|
||||
pkt_length -= n;
|
||||
}
|
||||
preslen = reslen;
|
||||
|
||||
pn_bufs = n_bufs;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user