From 96b98845a23a2d31b4931c7488faaf599d75a1a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 7 Jul 2017 10:37:30 +0300 Subject: [PATCH] Allow filters to be used with binlogrouter Filters can now be used with the binlogrouter to modify the contents of the binlog stream now that the correct function call is used. --- server/modules/routing/binlogrouter/blr.c | 6 +- .../modules/routing/binlogrouter/blr_master.c | 2 +- .../modules/routing/binlogrouter/blr_slave.c | 68 +++++++++---------- 3 files changed, 36 insertions(+), 40 deletions(-) diff --git a/server/modules/routing/binlogrouter/blr.c b/server/modules/routing/binlogrouter/blr.c index 375b80d29..0b7301050 100644 --- a/server/modules/routing/binlogrouter/blr.c +++ b/server/modules/routing/binlogrouter/blr.c @@ -2531,7 +2531,7 @@ blr_statistics(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) *ptr++ = 1; memcpy(ptr, result, len); - return slave->dcb->func.write(slave->dcb, ret); + return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, ret); } /** @@ -2558,7 +2558,7 @@ blr_ping(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) *ptr++ = 1; *ptr = 0; // OK - return slave->dcb->func.write(slave->dcb, ret); + return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, ret); } @@ -2665,7 +2665,7 @@ blr_send_custom_error(DCB *dcb, /** write error message */ memcpy(mysql_payload, mysql_error_msg, strlen(mysql_error_msg)); - return dcb->func.write(dcb, errbuf); + return MXS_SESSION_ROUTE_REPLY(dcb->session, errbuf); } /** diff --git a/server/modules/routing/binlogrouter/blr_master.c b/server/modules/routing/binlogrouter/blr_master.c index edbdd7a0a..6b5579f28 100644 --- a/server/modules/routing/binlogrouter/blr_master.c +++ b/server/modules/routing/binlogrouter/blr_master.c @@ -1974,7 +1974,7 @@ bool blr_send_packet(ROUTER_SLAVE *slave, uint8_t *buf, uint32_t len, bool first } slave->stats.n_bytes += GWBUF_LENGTH(buffer); - slave->dcb->func.write(slave->dcb, buffer); + MXS_SESSION_ROUTE_REPLY(slave->dcb->session, buffer); } else { diff --git a/server/modules/routing/binlogrouter/blr_slave.c b/server/modules/routing/binlogrouter/blr_slave.c index 9717eb7c8..1d7b16a47 100644 --- a/server/modules/routing/binlogrouter/blr_slave.c +++ b/server/modules/routing/binlogrouter/blr_slave.c @@ -790,7 +790,7 @@ blr_slave_replay(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *master) if ((clone = gwbuf_clone(master)) != NULL) { - return slave->dcb->func.write(slave->dcb, clone); + return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, clone); } else { @@ -826,7 +826,7 @@ blr_slave_send_error(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *msg) encode_value(&data[5], 1064, 16);// Error Code memcpy((char *)&data[7], "#42000", 6); memcpy(&data[13], msg, strlen(msg)); // Error Message - slave->dcb->func.write(slave->dcb, pkt); + MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); } /* @@ -888,7 +888,7 @@ blr_slave_send_timestamp(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) ptr += ts_len; // EOF packet to terminate result memcpy(ptr, timestamp_eof, sizeof(timestamp_eof)); - return slave->dcb->func.write(slave->dcb, pkt); + return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); } /** @@ -929,7 +929,7 @@ blr_slave_send_maxscale_version(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) *ptr++ = vers_len; // Length of result string memcpy((char *)ptr, version, vers_len); // Result string /* ptr += vers_len; Not required unless more data is to be added */ - slave->dcb->func.write(slave->dcb, pkt); + MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); return blr_slave_send_eof(router, slave, 5); } @@ -971,7 +971,7 @@ blr_slave_send_server_id(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) *ptr++ = id_len; // Length of result string memcpy((char *)ptr, server_id, id_len); // Result string /* ptr += id_len; Not required unless more data is to be added */ - slave->dcb->func.write(slave->dcb, pkt); + MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); return blr_slave_send_eof(router, slave, 5); } @@ -1026,8 +1026,7 @@ blr_slave_send_maxscale_variables(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) *ptr++ = vers_len; // Length of result string memcpy((char *)ptr, version, vers_len); // Result string /* ptr += vers_len; Not required unless more data is to be added */ - slave->dcb->func.write(slave->dcb, pkt); - + MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); return blr_slave_send_eof(router, slave, seqno++); } @@ -1086,7 +1085,7 @@ blr_slave_send_master_status(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) *ptr++ = 0; // Send 3 empty values *ptr++ = 0; *ptr++ = 0; - slave->dcb->func.write(slave->dcb, pkt); + MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); return blr_slave_send_eof(router, slave, 9); } @@ -1664,7 +1663,7 @@ blr_slave_send_slave_status(ROUTER_INSTANCE *router, // Trim the buffer to the actual size pkt = gwbuf_rtrim(pkt, len - actual_len); - slave->dcb->func.write(slave->dcb, pkt); + MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); return blr_slave_send_eof(router, slave, seqno++); } @@ -1733,7 +1732,7 @@ blr_slave_send_slave_hosts(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) *ptr++ = strlen(slave_uuid); // Length of result string memcpy((char *)ptr, slave_uuid, strlen(slave_uuid)); // Result string ptr += strlen(slave_uuid); - slave->dcb->func.write(slave->dcb, pkt); + MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); } sptr = sptr->next; } @@ -2838,7 +2837,7 @@ blr_slave_fake_rotate(ROUTER_INSTANCE *router, new_file, router->masterid); - return r_event ? slave->dcb->func.write(slave->dcb, r_event) : 0; + return r_event ? MXS_SESSION_ROUTE_REPLY(slave->dcb->session, r_event) : 0; } /** @@ -2955,7 +2954,7 @@ blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *fde) event_size - BINLOG_EVENT_CRC_SIZE); encode_value(ptr, chksum, 32); - return slave->dcb->func.write(slave->dcb, head); + return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, head); } @@ -2984,7 +2983,7 @@ blr_slave_send_fieldcount(ROUTER_INSTANCE *router, ptr += 3; *ptr++ = 0x01; // Sequence number in response *ptr++ = count; // Number of columns - return slave->dcb->func.write(slave->dcb, pkt); + return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); } @@ -3049,7 +3048,7 @@ blr_slave_send_columndef(ROUTER_INSTANCE *router, *ptr++ = 0; *ptr++ = 0; *ptr++ = 0; - return slave->dcb->func.write(slave->dcb, pkt); + return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); } @@ -3079,7 +3078,7 @@ blr_slave_send_eof(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int seqno) encode_value(ptr, 0, 16); // No errors ptr += 2; encode_value(ptr, 2, 16); // Autocommit enabled - return slave->dcb->func.write(slave->dcb, pkt); + return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); } /** @@ -3147,8 +3146,7 @@ blr_slave_send_disconnected_server(ROUTER_INSTANCE *router, memcpy((char *)ptr, state, strlen(state)); // Result string /* ptr += strlen(state); Not required unless more data is to be added */ - slave->dcb->func.write(slave->dcb, pkt); - + MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); return blr_slave_send_eof(router, slave, seqno++); } @@ -3317,7 +3315,7 @@ blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) memcpy((char *)ptr, state, strlen(state)); // Result string ptr += strlen(state); - slave->dcb->func.write(slave->dcb, pkt); + MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); sptr->state = BLRS_UNREGISTERED; dcb_close(sptr->dcb); @@ -3361,7 +3359,7 @@ blr_slave_send_ok(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) memcpy(GWBUF_DATA(pkt), ok_packet, sizeof(ok_packet)); - return slave->dcb->func.write(slave->dcb, pkt); + return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); } /** @@ -3411,7 +3409,7 @@ blr_slave_send_ok_message(ROUTER_INSTANCE* router, strcpy((char *)ptr, message); } - return slave->dcb->func.write(slave->dcb, pkt); + return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); } /** @@ -3774,7 +3772,7 @@ blr_slave_send_error_packet(ROUTER_SLAVE *slave, memcpy(&data[13], msg, strlen(msg)); // Error Message - slave->dcb->func.write(slave->dcb, pkt); + MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); } /** @@ -5144,7 +5142,7 @@ blr_slave_send_var_value(ROUTER_INSTANCE *router, memcpy((char *)ptr, value, vers_len); // Result string /* ptr += vers_len; Not required unless more data is added */ - slave->dcb->func.write(slave->dcb, pkt); + MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); return blr_slave_send_eof(router, slave, 5); } @@ -5229,7 +5227,7 @@ blr_slave_send_variable(ROUTER_INSTANCE *router, *ptr++ = vers_len; // Length of result string memcpy((char *)ptr, value, vers_len); // Result string with var value /* ptr += vers_len; Not required unless more data is added */ - slave->dcb->func.write(slave->dcb, pkt); + MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); MXS_FREE(old_ptr); @@ -5318,7 +5316,7 @@ blr_slave_send_columndef_with_info_schema(ROUTER_INSTANCE *router, *ptr++ = 0; *ptr++ = 0; - return slave->dcb->func.write(slave->dcb, pkt); + return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); } /** @@ -5513,7 +5511,7 @@ blr_slave_send_warning_message(ROUTER_INSTANCE* router, } slave->warning_msg = MXS_STRDUP_A(message); - return slave->dcb->func.write(slave->dcb, pkt); + return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); } /** @@ -5605,7 +5603,7 @@ blr_slave_show_warnings(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) /* ptr += msg_len; Not required unless more data is added */ } - slave->dcb->func.write(slave->dcb, pkt); + MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); return blr_slave_send_eof(router, slave, 7); } @@ -5753,7 +5751,7 @@ blr_slave_send_status_variable(ROUTER_INSTANCE *router, memcpy((char *)ptr, value, vers_len); /* ptr += vers_len; Not required unless more data is added */ - slave->dcb->func.write(slave->dcb, pkt); + MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); MXS_FREE(old_ptr); @@ -5857,7 +5855,7 @@ blr_slave_send_columndef_with_status_schema(ROUTER_INSTANCE *router, *ptr++ = 0; *ptr++ = 0; - return slave->dcb->func.write(slave->dcb, pkt); + return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); } /** @@ -5988,7 +5986,7 @@ blr_slave_send_heartbeat(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) } /* Write the packet */ - return slave->dcb->func.write(slave->dcb, h_event); + return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, h_event); } /** @@ -6560,7 +6558,7 @@ static int blr_send_connect_fake_rotate(ROUTER_INSTANCE *router, router->masterid); /* Send Fake Rotate Event or return 0*/ - return r_event ? slave->dcb->func.write(slave->dcb, r_event) : 0; + return r_event ? MXS_SESSION_ROUTE_REPLY(slave->dcb->session, r_event) : 0; } /** @@ -7018,7 +7016,7 @@ static int blr_send_fake_gtid_list(ROUTER_SLAVE *slave, serverid); /* Send Fake GTID_LIST Event or return 0*/ - return gl_event ? slave->dcb->func.write(slave->dcb, gl_event) : 0; + return gl_event ? MXS_SESSION_ROUTE_REPLY(slave->dcb->session, gl_event) : 0; } /** @@ -8232,7 +8230,7 @@ blr_show_binary_logs(ROUTER_INSTANCE *router, pos, seqno)) != NULL) { - slave->dcb->func.write(slave->dcb, pkt); + MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); /* Increment sequence */ seqno++; } @@ -8372,9 +8370,7 @@ static int binary_logs_select_cb(void *data, /* Set last file name */ data_set->last_file = MXS_STRDUP_A(values[0]); /* Write packet to client */ - dcb->func.write(dcb, pkt); - /* Set success */ - ret = 0; + ret = MXS_SESSION_ROUTE_REPLY(dcb->session, pkt); } return ret; /* Return success or fallure */ } @@ -8431,7 +8427,7 @@ static int blr_slave_send_id_ro(ROUTER_INSTANCE *router, seqno++)) != NULL) { /* Write packet to client */ - slave->dcb->func.write(slave->dcb, pkt); + MXS_SESSION_ROUTE_REPLY(slave->dcb->session, pkt); } /* Add the result set EOF and return */