Allow filters to be used with binlogrouter

Filters can now be used with the binlogrouter to modify the contents of
the binlog stream now that the correct function call is used.
This commit is contained in:
Markus Mäkelä 2017-07-07 10:37:30 +03:00
parent 54c6e0eb52
commit 96b98845a2
3 changed files with 36 additions and 40 deletions

View File

@ -2531,7 +2531,7 @@ blr_statistics(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
*ptr++ = 1;
memcpy(ptr, result, len);
return slave->dcb->func.write(slave->dcb, ret);
return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, ret);
}
/**
@ -2558,7 +2558,7 @@ blr_ping(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
*ptr++ = 1;
*ptr = 0; // OK
return slave->dcb->func.write(slave->dcb, ret);
return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, ret);
}
@ -2665,7 +2665,7 @@ blr_send_custom_error(DCB *dcb,
/** write error message */
memcpy(mysql_payload, mysql_error_msg, strlen(mysql_error_msg));
return dcb->func.write(dcb, errbuf);
return MXS_SESSION_ROUTE_REPLY(dcb->session, errbuf);
}
/**

View File

@ -1974,7 +1974,7 @@ bool blr_send_packet(ROUTER_SLAVE *slave, uint8_t *buf, uint32_t len, bool first
}
slave->stats.n_bytes += GWBUF_LENGTH(buffer);
slave->dcb->func.write(slave->dcb, buffer);
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, buffer);
}
else
{

View File

@ -790,7 +790,7 @@ blr_slave_replay(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *master)
if ((clone = gwbuf_clone(master)) != NULL)
{
return slave->dcb->func.write(slave->dcb, clone);
return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, clone);
}
else
{
@ -826,7 +826,7 @@ blr_slave_send_error(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *msg)
encode_value(&data[5], 1064, 16);// Error Code
memcpy((char *)&data[7], "#42000", 6);
memcpy(&data[13], msg, strlen(msg)); // Error Message
slave->dcb->func.write(slave->dcb, pkt);
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
}
/*
@ -888,7 +888,7 @@ blr_slave_send_timestamp(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
ptr += ts_len;
// EOF packet to terminate result
memcpy(ptr, timestamp_eof, sizeof(timestamp_eof));
return slave->dcb->func.write(slave->dcb, pkt);
return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
}
/**
@ -929,7 +929,7 @@ blr_slave_send_maxscale_version(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
*ptr++ = vers_len; // Length of result string
memcpy((char *)ptr, version, vers_len); // Result string
/* ptr += vers_len; Not required unless more data is to be added */
slave->dcb->func.write(slave->dcb, pkt);
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
return blr_slave_send_eof(router, slave, 5);
}
@ -971,7 +971,7 @@ blr_slave_send_server_id(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
*ptr++ = id_len; // Length of result string
memcpy((char *)ptr, server_id, id_len); // Result string
/* ptr += id_len; Not required unless more data is to be added */
slave->dcb->func.write(slave->dcb, pkt);
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
return blr_slave_send_eof(router, slave, 5);
}
@ -1026,8 +1026,7 @@ blr_slave_send_maxscale_variables(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
*ptr++ = vers_len; // Length of result string
memcpy((char *)ptr, version, vers_len); // Result string
/* ptr += vers_len; Not required unless more data is to be added */
slave->dcb->func.write(slave->dcb, pkt);
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
return blr_slave_send_eof(router, slave, seqno++);
}
@ -1086,7 +1085,7 @@ blr_slave_send_master_status(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
*ptr++ = 0; // Send 3 empty values
*ptr++ = 0;
*ptr++ = 0;
slave->dcb->func.write(slave->dcb, pkt);
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
return blr_slave_send_eof(router, slave, 9);
}
@ -1664,7 +1663,7 @@ blr_slave_send_slave_status(ROUTER_INSTANCE *router,
// Trim the buffer to the actual size
pkt = gwbuf_rtrim(pkt, len - actual_len);
slave->dcb->func.write(slave->dcb, pkt);
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
return blr_slave_send_eof(router, slave, seqno++);
}
@ -1733,7 +1732,7 @@ blr_slave_send_slave_hosts(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
*ptr++ = strlen(slave_uuid); // Length of result string
memcpy((char *)ptr, slave_uuid, strlen(slave_uuid)); // Result string
ptr += strlen(slave_uuid);
slave->dcb->func.write(slave->dcb, pkt);
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
}
sptr = sptr->next;
}
@ -2838,7 +2837,7 @@ blr_slave_fake_rotate(ROUTER_INSTANCE *router,
new_file,
router->masterid);
return r_event ? slave->dcb->func.write(slave->dcb, r_event) : 0;
return r_event ? MXS_SESSION_ROUTE_REPLY(slave->dcb->session, r_event) : 0;
}
/**
@ -2955,7 +2954,7 @@ blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *fde)
event_size - BINLOG_EVENT_CRC_SIZE);
encode_value(ptr, chksum, 32);
return slave->dcb->func.write(slave->dcb, head);
return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, head);
}
@ -2984,7 +2983,7 @@ blr_slave_send_fieldcount(ROUTER_INSTANCE *router,
ptr += 3;
*ptr++ = 0x01; // Sequence number in response
*ptr++ = count; // Number of columns
return slave->dcb->func.write(slave->dcb, pkt);
return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
}
@ -3049,7 +3048,7 @@ blr_slave_send_columndef(ROUTER_INSTANCE *router,
*ptr++ = 0;
*ptr++ = 0;
*ptr++ = 0;
return slave->dcb->func.write(slave->dcb, pkt);
return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
}
@ -3079,7 +3078,7 @@ blr_slave_send_eof(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int seqno)
encode_value(ptr, 0, 16); // No errors
ptr += 2;
encode_value(ptr, 2, 16); // Autocommit enabled
return slave->dcb->func.write(slave->dcb, pkt);
return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
}
/**
@ -3147,8 +3146,7 @@ blr_slave_send_disconnected_server(ROUTER_INSTANCE *router,
memcpy((char *)ptr, state, strlen(state)); // Result string
/* ptr += strlen(state); Not required unless more data is to be added */
slave->dcb->func.write(slave->dcb, pkt);
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
return blr_slave_send_eof(router, slave, seqno++);
}
@ -3317,7 +3315,7 @@ blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
memcpy((char *)ptr, state, strlen(state)); // Result string
ptr += strlen(state);
slave->dcb->func.write(slave->dcb, pkt);
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
sptr->state = BLRS_UNREGISTERED;
dcb_close(sptr->dcb);
@ -3361,7 +3359,7 @@ blr_slave_send_ok(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
memcpy(GWBUF_DATA(pkt), ok_packet, sizeof(ok_packet));
return slave->dcb->func.write(slave->dcb, pkt);
return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
}
/**
@ -3411,7 +3409,7 @@ blr_slave_send_ok_message(ROUTER_INSTANCE* router,
strcpy((char *)ptr, message);
}
return slave->dcb->func.write(slave->dcb, pkt);
return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
}
/**
@ -3774,7 +3772,7 @@ blr_slave_send_error_packet(ROUTER_SLAVE *slave,
memcpy(&data[13], msg, strlen(msg)); // Error Message
slave->dcb->func.write(slave->dcb, pkt);
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
}
/**
@ -5144,7 +5142,7 @@ blr_slave_send_var_value(ROUTER_INSTANCE *router,
memcpy((char *)ptr, value, vers_len); // Result string
/* ptr += vers_len; Not required unless more data is added */
slave->dcb->func.write(slave->dcb, pkt);
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
return blr_slave_send_eof(router, slave, 5);
}
@ -5229,7 +5227,7 @@ blr_slave_send_variable(ROUTER_INSTANCE *router,
*ptr++ = vers_len; // Length of result string
memcpy((char *)ptr, value, vers_len); // Result string with var value
/* ptr += vers_len; Not required unless more data is added */
slave->dcb->func.write(slave->dcb, pkt);
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
MXS_FREE(old_ptr);
@ -5318,7 +5316,7 @@ blr_slave_send_columndef_with_info_schema(ROUTER_INSTANCE *router,
*ptr++ = 0;
*ptr++ = 0;
return slave->dcb->func.write(slave->dcb, pkt);
return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
}
/**
@ -5513,7 +5511,7 @@ blr_slave_send_warning_message(ROUTER_INSTANCE* router,
}
slave->warning_msg = MXS_STRDUP_A(message);
return slave->dcb->func.write(slave->dcb, pkt);
return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
}
/**
@ -5605,7 +5603,7 @@ blr_slave_show_warnings(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
/* ptr += msg_len; Not required unless more data is added */
}
slave->dcb->func.write(slave->dcb, pkt);
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
return blr_slave_send_eof(router, slave, 7);
}
@ -5753,7 +5751,7 @@ blr_slave_send_status_variable(ROUTER_INSTANCE *router,
memcpy((char *)ptr, value, vers_len);
/* ptr += vers_len; Not required unless more data is added */
slave->dcb->func.write(slave->dcb, pkt);
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
MXS_FREE(old_ptr);
@ -5857,7 +5855,7 @@ blr_slave_send_columndef_with_status_schema(ROUTER_INSTANCE *router,
*ptr++ = 0;
*ptr++ = 0;
return slave->dcb->func.write(slave->dcb, pkt);
return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
}
/**
@ -5988,7 +5986,7 @@ blr_slave_send_heartbeat(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
}
/* Write the packet */
return slave->dcb->func.write(slave->dcb, h_event);
return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, h_event);
}
/**
@ -6560,7 +6558,7 @@ static int blr_send_connect_fake_rotate(ROUTER_INSTANCE *router,
router->masterid);
/* Send Fake Rotate Event or return 0*/
return r_event ? slave->dcb->func.write(slave->dcb, r_event) : 0;
return r_event ? MXS_SESSION_ROUTE_REPLY(slave->dcb->session, r_event) : 0;
}
/**
@ -7018,7 +7016,7 @@ static int blr_send_fake_gtid_list(ROUTER_SLAVE *slave,
serverid);
/* Send Fake GTID_LIST Event or return 0*/
return gl_event ? slave->dcb->func.write(slave->dcb, gl_event) : 0;
return gl_event ? MXS_SESSION_ROUTE_REPLY(slave->dcb->session, gl_event) : 0;
}
/**
@ -8232,7 +8230,7 @@ blr_show_binary_logs(ROUTER_INSTANCE *router,
pos,
seqno)) != NULL)
{
slave->dcb->func.write(slave->dcb, pkt);
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
/* Increment sequence */
seqno++;
}
@ -8372,9 +8370,7 @@ static int binary_logs_select_cb(void *data,
/* Set last file name */
data_set->last_file = MXS_STRDUP_A(values[0]);
/* Write packet to client */
dcb->func.write(dcb, pkt);
/* Set success */
ret = 0;
ret = MXS_SESSION_ROUTE_REPLY(dcb->session, pkt);
}
return ret; /* Return success or fallure */
}
@ -8431,7 +8427,7 @@ static int blr_slave_send_id_ro(ROUTER_INSTANCE *router,
seqno++)) != NULL)
{
/* Write packet to client */
slave->dcb->func.write(slave->dcb, pkt);
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt);
}
/* Add the result set EOF and return */