Zendesk's Maxwell Compatibility (#119)

* Binlog router: Introduce maxwell_compatibility flag

* Binlog router: Handle 'server vars' query

This is a step towards using MaxScale with Zendesk's Maxwell.

* Binlog router: Handle results charset query

* Binlog router: Handle sql_mode query

* Binlog router: Handle server_id query

* Binlog router: Handle 'binlog vars' queries

* Binlog router: Handle @@lower_case_table_names query

* Binlog router: Handle @@global.binlog_checksum query

* Binlog router: DRY Maxwell SQL queries
This commit is contained in:
Adam Szkoda
2017-02-13 16:40:01 +01:00
committed by MassimilianoPinto
parent fdc66c660c
commit 825782799f
6 changed files with 300 additions and 39 deletions

View File

@ -185,6 +185,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
{"master_version", MXS_MODULE_PARAM_STRING},
{"master_hostname", MXS_MODULE_PARAM_STRING},
{"mariadb10-compatibility", MXS_MODULE_PARAM_BOOL, "false"},
{"maxwell-compatibility", MXS_MODULE_PARAM_BOOL, "false"},
{"filestem", MXS_MODULE_PARAM_STRING, BINLOG_NAME_ROOT},
{"file", MXS_MODULE_PARAM_COUNT, "1"},
{"transaction_safety", MXS_MODULE_PARAM_BOOL, "false"},
@ -331,6 +332,7 @@ createInstance(SERVICE *service, char **options)
inst->heartbeat = config_get_integer(params, "heartbeat");
inst->ssl_cert_verification_depth = config_get_integer(params, "ssl_cert_verification_depth");
inst->mariadb10_compat = config_get_bool(params, "mariadb10-compatibility");
inst->maxwell_compat = config_get_bool(params, "maxwell-compatibility");
inst->trx_safe = config_get_bool(params, "transaction_safety");
inst->set_master_version = config_copy_string(params, "master_version");
inst->set_master_hostname = config_copy_string(params, "master_hostname");
@ -486,6 +488,10 @@ createInstance(SERVICE *service, char **options)
{
inst->mariadb10_compat = config_truth_value(value);
}
else if (strcmp(options[i], "maxwell-compatibility") == 0)
{
inst->maxwell_compat = config_truth_value(value);
}
else if (strcmp(options[i], "filestem") == 0)
{
MXS_FREE(inst->fileroot);

View File

@ -248,6 +248,10 @@ enum blr_aes_mode
#define MARIADB_FL_DDL 32
#define MARIADB_FL_STANDALONE 1
/* Maxwell-related SQL queries */
#define MYSQL_CONNECTOR_SERVER_VARS_QUERY "SELECT @@session.auto_increment_increment AS auto_increment_increment, @@character_set_client AS character_set_client, @@character_set_connection AS character_set_connection, @@character_set_results AS character_set_results, @@character_set_server AS character_set_server, @@init_connect AS init_connect, @@interactive_timeout AS interactive_timeout, @@license AS license, @@lower_case_table_names AS lower_case_table_names, @@max_allowed_packet AS max_allowed_packet, @@net_buffer_length AS net_buffer_length, @@net_write_timeout AS net_write_timeout, @@query_cache_size AS query_cache_size, @@query_cache_type AS query_cache_type, @@sql_mode AS sql_mode, @@system_time_zone AS system_time_zone, @@time_zone AS time_zone, @@tx_isolation AS tx_isolation, @@wait_timeout AS wait_timeout"
#define MYSQL_CONNECTOR_SQL_MODE_QUERY "SET sql_mode='NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES'"
/* Saved credential file name's tail */
static const char BLR_DBUSERS_DIR[] = "cache/users";
static const char BLR_DBUSERS_FILE[] = "dbusers";
@ -494,6 +498,9 @@ typedef struct
GWBUF *selecthostname; /*< select @@hostname */
GWBUF *map; /*< select @@max_allowed_packet */
GWBUF *mariadb10; /*< set @mariadb_slave_capability */
GWBUF *server_vars; /*< MySQL Connector master server variables */
GWBUF *binlog_vars; /*< SELECT @@global.log_bin, @@global.binlog_format, @@global.binlog_row_image; */
GWBUF *lower_case_tables; /*< select @@lower_case_table_names */
uint8_t *fde_event; /*< Format Description Event */
int fde_len; /*< Length of fde_event */
} MASTER_RESPONSES;
@ -528,6 +535,7 @@ typedef struct router_instance
char *fileroot; /*< Root of binlog filename */
bool master_chksum; /*< Does the master provide checksums */
bool mariadb10_compat; /*< MariaDB 10.0 compatibility */
bool maxwell_compat; /*< Zendesk's Maxwell compatibility */
char *master_uuid; /*< Set UUID of the master, sent to slaves */
DCB *master; /*< DCB for master connection */
DCB *client; /*< DCB for dummy client */
@ -664,29 +672,58 @@ typedef struct binlog_encryption_ctx
#define BLRM_SUUID 0x000C
#define BLRM_LATIN1 0x000D
#define BLRM_UTF8 0x000E
#define BLRM_SELECT1 0x000F
#define BLRM_SELECTVER 0x0010
#define BLRM_SELECTVERCOM 0x0011
#define BLRM_SELECTHOSTNAME 0x0012
#define BLRM_MAP 0x0013
#define BLRM_REGISTER 0x0014
#define BLRM_CHECK_SEMISYNC 0x0015
#define BLRM_REQUEST_SEMISYNC 0x0016
#define BLRM_REQUEST_BINLOGDUMP 0x0017
#define BLRM_BINLOGDUMP 0x0018
#define BLRM_SLAVE_STOPPED 0x0019
#define BLRM_RESULTS_CHARSET 0x000F
#define BLRM_SQL_MODE 0x0010
#define BLRM_SELECT1 0x0011
#define BLRM_SELECTVER 0x0012
#define BLRM_SELECTVERCOM 0x0013
#define BLRM_SELECTHOSTNAME 0x0014
#define BLRM_MAP 0x0015
#define BLRM_SERVER_VARS 0x0016
#define BLRM_BINLOG_VARS 0x0017
#define BLRM_LOWER_CASE_TABLES 0x0018
#define BLRM_REGISTER 0x0019
#define BLRM_CHECK_SEMISYNC 0x001A
#define BLRM_REQUEST_SEMISYNC 0x001B
#define BLRM_REQUEST_BINLOGDUMP 0x001C
#define BLRM_BINLOGDUMP 0x001D
#define BLRM_SLAVE_STOPPED 0x001E
#define BLRM_MAXSTATE 0x0019
#define BLRM_MAXSTATE 0x001E
static char *blrm_states[] =
{
"Unconfigured", "Unconnected", "Connecting", "Authenticated", "Timestamp retrieval",
"Server ID retrieval", "HeartBeat Period setup", "binlog checksum config",
"binlog checksum rerieval", "Set MariaDB slave capability", "GTID Mode retrieval",
"Master UUID retrieval", "Set Slave UUID", "Set Names latin1", "Set Names utf8", "select 1",
"select version()", "select @@version_comment", "select @@hostname",
"select @@max_allowed_packet", "Register slave", "Semi-Sync Support retrivial",
"Request Semi-Sync Replication", "Request Binlog Dump", "Binlog Dump", "Slave stopped"
"Unconfigured",
"Unconnected",
"Connecting",
"Authenticated",
"Timestamp retrieval",
"Server ID retrieval",
"HeartBeat Period setup",
"binlog checksum config",
"binlog checksum rerieval",
"Set MariaDB slave capability",
"GTID Mode retrieval",
"Master UUID retrieval",
"Set Slave UUID",
"Set Names latin1",
"Set Names utf8",
"Set results charset null",
"Set sql_mode",
"select 1",
"select version()",
"select @@version_comment",
"select @@hostname",
"select @@max_allowed_packet",
"Query server variables",
"Query binlog variables",
"Query @@lower_case_table_names",
"Register slave",
"Semi-Sync Support retrivial",
"Request Semi-Sync Replication",
"Request Binlog Dump",
"Binlog Dump",
"Slave stopped"
};
#define BLRS_CREATED 0x0000
@ -787,6 +824,7 @@ extern const char *blr_get_encryption_algorithm(int);
extern int blr_check_encryption_algorithm(char *);
extern const char *blr_encryption_algorithm_list(void);
extern bool blr_get_encryption_key(ROUTER_INSTANCE *);
extern const char *blr_skip_leading_sql_comments(const char *);
MXS_END_DECLS

View File

@ -2297,6 +2297,9 @@ blr_cache_read_master_data(ROUTER_INSTANCE *router)
router->saved_master.selecthostname = blr_cache_read_response(router, "selecthostname");
router->saved_master.map = blr_cache_read_response(router, "map");
router->saved_master.mariadb10 = blr_cache_read_response(router, "mariadb10");
router->saved_master.server_vars = blr_cache_read_response(router, "server_vars");
router->saved_master.binlog_vars = blr_cache_read_response(router, "binlog_vars");
router->saved_master.lower_case_tables = blr_cache_read_response(router, "lower_case_tables");
}
/**

View File

@ -637,6 +637,30 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
}
router->saved_master.utf8 = buf;
blr_cache_response(router, "utf8", buf);
if (router->maxwell_compat)
{
buf = blr_make_query(router->master, "SET character_set_results = NULL");
router->master_state = BLRM_RESULTS_CHARSET;
router->master->func.write(router->master, buf);
}
else
{
buf = blr_make_query(router->master, "SELECT 1");
router->master_state = BLRM_SELECT1;
router->master->func.write(router->master, buf);
}
break;
case BLRM_RESULTS_CHARSET:
gwbuf_free(buf);
buf = blr_make_query(router->master, MYSQL_CONNECTOR_SQL_MODE_QUERY);
router->master_state = BLRM_SQL_MODE;
router->master->func.write(router->master, buf);
break;
case BLRM_SQL_MODE:
gwbuf_free(buf);
buf = blr_make_query(router->master, "SELECT 1");
router->master_state = BLRM_SELECT1;
router->master->func.write(router->master, buf);
@ -697,6 +721,44 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
}
router->saved_master.map = buf;
blr_cache_response(router, "map", buf);
// Query for Server Variables
buf = blr_make_query(router->master, MYSQL_CONNECTOR_SERVER_VARS_QUERY);
router->master_state = BLRM_SERVER_VARS;
router->master->func.write(router->master, buf);
break;
case BLRM_SERVER_VARS:
if (router->saved_master.server_vars)
{
GWBUF_CONSUME_ALL(router->saved_master.server_vars);
}
router->saved_master.server_vars = buf;
blr_cache_response(router, "server_vars", buf);
buf = blr_make_query(router->master, "SELECT IF(@@global.log_bin, 'ON', 'OFF'), @@global.binlog_format, @@global.binlog_row_image");
router->master_state = BLRM_BINLOG_VARS;
router->master->func.write(router->master, buf);
break;
case BLRM_BINLOG_VARS:
if (router->saved_master.binlog_vars)
{
GWBUF_CONSUME_ALL(router->saved_master.binlog_vars);
}
router->saved_master.binlog_vars = buf;
blr_cache_response(router, "binlog_vars", buf);
buf = blr_make_query(router->master, "select @@lower_case_table_names");
router->master_state = BLRM_LOWER_CASE_TABLES;
router->master->func.write(router->master, buf);
break;
case BLRM_LOWER_CASE_TABLES:
if (router->saved_master.lower_case_tables)
{
GWBUF_CONSUME_ALL(router->saved_master.lower_case_tables);
}
router->saved_master.lower_case_tables = buf;
blr_cache_response(router, "lower_case_tables", buf);
buf = blr_make_registration(router);
router->master_state = BLRM_REGISTER;
router->master->func.write(router->master, buf);

View File

@ -282,6 +282,46 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
return 0;
}
/*
* Return a pointer to where the actual SQL query starts, skipping initial
* comments and whitespace characters, if there are any.
*/
const char *
blr_skip_leading_sql_comments(const char *sql_query)
{
const char *p = sql_query;
while (*p) {
if (*p == '/' && p[1] == '*')
{
++p; // skip '/'
++p; // skip '*'
while (*p)
{
if (*p == '*' && p[1] == '/')
{
++p; // skip '*'
++p; // skip '/'
break;
}
else
{
++p;
}
}
}
else if (isspace(*p))
{
++p;
}
else
{
return p;
}
}
return p;
}
/**
* Handle a query from the slave. This is expected to be one of the "standard"
* queries we expect as part of the registraton process. Most of these can
@ -350,6 +390,13 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
char *ptr;
extern char *strcasestr();
bool unexpected = true;
static const char mysql_connector_results_charset_query[] = "SET character_set_results = NULL";
static const char maxwell_server_id_query[] = "SELECT @@server_id as server_id";
static const char maxwell_log_bin_query[] = "SHOW VARIABLES LIKE 'log_bin'";
static const char maxwell_binlog_format_query[] = "SHOW VARIABLES LIKE 'binlog_format'";
static const char maxwell_binlog_row_image_query[] = "SHOW VARIABLES LIKE 'binlog_row_image'";
static const char maxwell_lower_case_tables_query[] = "select @@lower_case_table_names";
qtext = (char*)GWBUF_DATA(queue);
query_len = extract_field((uint8_t *)qtext, 24) - 1;
@ -396,7 +443,68 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
* own interaction with the real master. We simply replay these saved responses
* to the slave.
*/
if ((word = strtok_r(query_text, sep, &brkb)) == NULL)
if (strcmp(blr_skip_leading_sql_comments(query_text), MYSQL_CONNECTOR_SERVER_VARS_QUERY) == 0)
{
int rc = blr_slave_replay(router, slave, router->saved_master.server_vars);
if (rc >= 0)
{
MXS_FREE(query_text);
return 1;
}
MXS_ERROR("Error sending mysql-connector-j server variables");
}
else if (router->maxwell_compat && strcmp(query_text, mysql_connector_results_charset_query) == 0)
{
MXS_FREE(query_text);
return blr_slave_send_ok(router, slave);
}
else if (router->maxwell_compat && strcmp(query_text, MYSQL_CONNECTOR_SQL_MODE_QUERY) == 0)
{
MXS_FREE(query_text);
return blr_slave_send_ok(router, slave);
}
else if (strcmp(query_text, maxwell_server_id_query) == 0)
{
char server_id[40];
sprintf(server_id, "%d", router->masterid);
MXS_FREE(query_text);
return blr_slave_send_var_value(router, slave, "server_id", server_id, BLR_TYPE_STRING);
}
else if (strcmp(query_text, maxwell_log_bin_query) == 0)
{
char *log_bin = blr_extract_column(router->saved_master.binlog_vars, 1);
blr_slave_send_var_value(router, slave, "Value", log_bin == NULL ? "" : log_bin, BLR_TYPE_STRING);
MXS_FREE(log_bin);
MXS_FREE(query_text);
return 1;
}
else if (strcmp(query_text, maxwell_binlog_format_query) == 0)
{
char *binlog_format = blr_extract_column(router->saved_master.binlog_vars, 2);
blr_slave_send_var_value(router, slave, "Value", binlog_format == NULL ? "" : binlog_format, BLR_TYPE_STRING);
MXS_FREE(binlog_format);
MXS_FREE(query_text);
return 1;
}
else if (strcmp(query_text, maxwell_binlog_row_image_query) == 0)
{
char *binlog_row_image = blr_extract_column(router->saved_master.binlog_vars, 1);
blr_slave_send_var_value(router, slave, "Value", binlog_row_image == NULL ? "" : binlog_row_image, BLR_TYPE_STRING);
MXS_FREE(binlog_row_image);
MXS_FREE(query_text);
return 1;
}
else if (strcmp(query_text, maxwell_lower_case_tables_query) == 0)
{
int rc = blr_slave_replay(router, slave, router->saved_master.lower_case_tables);
if (rc >= 0)
{
MXS_FREE(query_text);
return 1;
}
MXS_ERROR("Error sending lower_case_tables query response");
}
else if ((word = strtok_r(query_text, sep, &brkb)) == NULL)
{
MXS_ERROR("%s: Incomplete query.", router->service->name);
}
@ -411,7 +519,7 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
MXS_FREE(query_text);
return blr_slave_send_timestamp(router, slave);
}
else if (strcasecmp(word, "@master_binlog_checksum") == 0)
else if (strcasecmp(word, "@master_binlog_checksum") == 0 || strcasecmp(word, "@@global.binlog_checksum") == 0)
{
MXS_FREE(query_text);
return blr_slave_replay(router, slave, router->saved_master.chksum2);

View File

@ -700,10 +700,54 @@ int main(int argc, char **argv)
return 1;
}
tests++;
/**
* Verify SQL query initial comment skipping function works on a real use case.
*/
const char *mysql_connector_j_actual = blr_skip_leading_sql_comments("/* mysql-connector-java-5.1.39 ( Revision: 3289a357af6d09ecc1a10fd3c26e95183e5790ad ) */SELECT @@session.auto_increment_increment AS auto_increment_increment, @@character_set_client AS character_set_client, @@character_set_connection AS character_set_connection, @@character_set_results AS character_set_results, @@character_set_server AS character_set_server, @@init_connect AS init_connect, @@interactive_timeout AS interactive_timeout, @@license AS license, @@lower_case_table_names AS lower_case_table_names, @@max_allowed_packet AS max_allowed_packet, @@net_buffer_length AS net_buffer_length, @@net_write_timeout AS net_write_timeout, @@query_cache_size AS query_cache_size, @@query_cache_type AS query_cache_type, @@sql_mode AS sql_mode, @@system_time_zone AS system_time_zone, @@time_zone AS time_zone, @@tx_isolation AS tx_isolation, @@wait_timeout AS wait_timeout");
const char *mysql_connector_j_expected = "SELECT @@session.auto_increment_increment AS auto_increment_increment, @@character_set_client AS character_set_client, @@character_set_connection AS character_set_connection, @@character_set_results AS character_set_results, @@character_set_server AS character_set_server, @@init_connect AS init_connect, @@interactive_timeout AS interactive_timeout, @@license AS license, @@lower_case_table_names AS lower_case_table_names, @@max_allowed_packet AS max_allowed_packet, @@net_buffer_length AS net_buffer_length, @@net_write_timeout AS net_write_timeout, @@query_cache_size AS query_cache_size, @@query_cache_type AS query_cache_type, @@sql_mode AS sql_mode, @@system_time_zone AS system_time_zone, @@time_zone AS time_zone, @@tx_isolation AS tx_isolation, @@wait_timeout AS wait_timeout";
if (strcmp(mysql_connector_j_actual, mysql_connector_j_expected) == 0)
{
printf("Test %d PASSED", tests);
}
else
{
printf("Test %d FAILED: Actual result: %s", tests, mysql_connector_j_actual);
return 1;
}
tests++;
const char *no_comment_query_actual = blr_skip_leading_sql_comments("SELECT foo FROM bar LIMIT 1");
const char *no_comment_query_expected = "SELECT foo FROM bar LIMIT 1";
if (strcmp(no_comment_query_actual, no_comment_query_expected) == 0)
{
printf("Test %d PASSED", tests);
}
else
{
printf("Test %d FAILED: Actual result: %s", tests, no_comment_query_actual);
return 1;
}
tests++;
const char *unclosed_comment_query_actual = blr_skip_leading_sql_comments("/* SELECT foo FROM bar LIMIT 1");
const char *unclosed_comment_query_expected = "";
if (strcmp(unclosed_comment_query_actual, unclosed_comment_query_expected) == 0)
{
printf("Test %d PASSED", tests);
}
else
{
printf("Test %d FAILED: Actual result: %s", tests, no_comment_query_actual);
return 1;
}
mxs_log_flush_sync();
mxs_log_finish();
MXS_FREE(inst);
return 0;
}