Hack binlogrouter to work with resultset buffering

The binlogrouter needs to manipulate the protocol structures in order for
the resultset buffering to work correctly. If the state isn't manipulated
for COM_QUERY statements, the resultsets aren't buffered and will be
routed in separate buffers.
This commit is contained in:
Markus Makela 2016-12-12 11:56:41 +02:00
parent 03681db687
commit 5ccdfe54bd

View File

@ -83,7 +83,7 @@
static GWBUF *blr_make_query(char *statement);
static GWBUF *blr_make_query(DCB *dcb, char *query);
static GWBUF *blr_make_registration(ROUTER_INSTANCE *router);
static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router);
void encode_value(unsigned char *data, unsigned int value, int len);
@ -214,7 +214,7 @@ blr_start_master(void* data)
}
router->master_state = BLRM_AUTHENTICATED;
router->master->func.write(router->master, blr_make_query("SELECT UNIX_TIMESTAMP()"));
router->master->func.write(router->master, blr_make_query(router->master, "SELECT UNIX_TIMESTAMP()"));
router->master_state = BLRM_TIMESTAMP;
router->stats.n_masterstarts++;
@ -453,7 +453,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
case BLRM_TIMESTAMP:
// Response to a timestamp message, no need to save this.
gwbuf_free(buf);
buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'");
buf = blr_make_query(router->master, "SHOW VARIABLES LIKE 'SERVER_ID'");
router->master_state = BLRM_SERVERID;
router->master->func.write(router->master, buf);
router->retry_backoff = 1;
@ -479,7 +479,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
{
char str[BLRM_SET_HEARTBEAT_QUERY_LEN];
sprintf(str, "SET @master_heartbeat_period = %lu000000000", router->heartbeat);
buf = blr_make_query(str);
buf = blr_make_query(router->master, str);
}
router->master_state = BLRM_HBPERIOD;
router->master->func.write(router->master, buf);
@ -494,7 +494,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
}
router->saved_master.heartbeat = buf;
blr_cache_response(router, "heartbeat", buf);
buf = blr_make_query("SET @master_binlog_checksum = @@global.binlog_checksum");
buf = blr_make_query(router->master, "SET @master_binlog_checksum = @@global.binlog_checksum");
router->master_state = BLRM_CHKSUM1;
router->master->func.write(router->master, buf);
break;
@ -506,7 +506,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
}
router->saved_master.chksum1 = buf;
blr_cache_response(router, "chksum1", buf);
buf = blr_make_query("SELECT @master_binlog_checksum");
buf = blr_make_query(router->master, "SELECT @master_binlog_checksum");
router->master_state = BLRM_CHKSUM2;
router->master->func.write(router->master, buf);
break;
@ -532,12 +532,12 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
if (router->mariadb10_compat)
{
buf = blr_make_query("SET @mariadb_slave_capability=4");
buf = blr_make_query(router->master, "SET @mariadb_slave_capability=4");
router->master_state = BLRM_MARIADB10;
}
else
{
buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE");
buf = blr_make_query(router->master, "SELECT @@GLOBAL.GTID_MODE");
router->master_state = BLRM_GTIDMODE;
}
router->master->func.write(router->master, buf);
@ -551,7 +551,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
}
router->saved_master.mariadb10 = buf;
blr_cache_response(router, "mariadb10", buf);
buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'");
buf = blr_make_query(router->master, "SHOW VARIABLES LIKE 'SERVER_UUID'");
router->master_state = BLRM_MUUID;
router->master->func.write(router->master, buf);
break;
@ -563,7 +563,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
}
router->saved_master.gtid_mode = buf;
blr_cache_response(router, "gtidmode", buf);
buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'");
buf = blr_make_query(router->master, "SHOW VARIABLES LIKE 'SERVER_UUID'");
router->master_state = BLRM_MUUID;
router->master->func.write(router->master, buf);
break;
@ -601,7 +601,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
router->saved_master.uuid = buf;
blr_cache_response(router, "uuid", buf);
sprintf(query, "SET @slave_uuid='%s'", router->uuid);
buf = blr_make_query(query);
buf = blr_make_query(router->master, query);
router->master_state = BLRM_SUUID;
router->master->func.write(router->master, buf);
break;
@ -614,7 +614,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
}
router->saved_master.setslaveuuid = buf;
blr_cache_response(router, "ssuuid", buf);
buf = blr_make_query("SET NAMES latin1");
buf = blr_make_query(router->master, "SET NAMES latin1");
router->master_state = BLRM_LATIN1;
router->master->func.write(router->master, buf);
break;
@ -626,7 +626,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
}
router->saved_master.setnames = buf;
blr_cache_response(router, "setnames", buf);
buf = blr_make_query("SET NAMES utf8");
buf = blr_make_query(router->master, "SET NAMES utf8");
router->master_state = BLRM_UTF8;
router->master->func.write(router->master, buf);
break;
@ -638,7 +638,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
}
router->saved_master.utf8 = buf;
blr_cache_response(router, "utf8", buf);
buf = blr_make_query("SELECT 1");
buf = blr_make_query(router->master, "SELECT 1");
router->master_state = BLRM_SELECT1;
router->master->func.write(router->master, buf);
break;
@ -650,7 +650,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
}
router->saved_master.select1 = buf;
blr_cache_response(router, "select1", buf);
buf = blr_make_query("SELECT VERSION()");
buf = blr_make_query(router->master, "SELECT VERSION()");
router->master_state = BLRM_SELECTVER;
router->master->func.write(router->master, buf);
break;
@ -662,7 +662,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
}
router->saved_master.selectver = buf;
blr_cache_response(router, "selectver", buf);
buf = blr_make_query("SELECT @@version_comment limit 1");
buf = blr_make_query(router->master, "SELECT @@version_comment limit 1");
router->master_state = BLRM_SELECTVERCOM;
router->master->func.write(router->master, buf);
break;
@ -674,7 +674,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
}
router->saved_master.selectvercom = buf;
blr_cache_response(router, "selectvercom", buf);
buf = blr_make_query("SELECT @@hostname");
buf = blr_make_query(router->master, "SELECT @@hostname");
router->master_state = BLRM_SELECTHOSTNAME;
router->master->func.write(router->master, buf);
break;
@ -686,7 +686,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
}
router->saved_master.selecthostname = buf;
blr_cache_response(router, "selecthostname", buf);
buf = blr_make_query("SELECT @@max_allowed_packet");
buf = blr_make_query(router->master, "SELECT @@max_allowed_packet");
router->master_state = BLRM_MAP;
router->master->func.write(router->master, buf);
break;
@ -714,7 +714,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
router->service->dbref->server->name,
router->service->dbref->server->port);
buf = blr_make_query("SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'");
buf = blr_make_query(router->master, "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'");
router->master_state = BLRM_CHECK_SEMISYNC;
router->master->func.write(router->master, buf);
@ -771,7 +771,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
router->service->dbref->server->port);
}
buf = blr_make_query("SET @rpl_semi_sync_slave = 1");
buf = blr_make_query(router->master, "SET @rpl_semi_sync_slave = 1");
router->master_state = BLRM_REQUEST_SEMISYNC;
router->master->func.write(router->master, buf);
@ -845,10 +845,13 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
/**
* Build a MySQL query into a GWBUF that we can send to the master database
*
* The data is not written to @c dcb but the expected protocol state is fixed.
*
* @param dcb The DCB where this will be written
* @param query The text of the query to send
*/
static GWBUF *
blr_make_query(char *query)
blr_make_query(DCB *dcb, char *query)
{
GWBUF *buf;
unsigned char *data;
@ -866,6 +869,10 @@ blr_make_query(char *query)
data[4] = COM_QUERY; // Command
memcpy(&data[5], query, strlen(query));
// This is hack to get the result set processing in order for binlogrouter
MySQLProtocol *proto = (MySQLProtocol*)dcb->protocol;
proto->current_command = MYSQL_COM_QUERY;
return buf;
}
@ -904,6 +911,10 @@ blr_make_registration(ROUTER_INSTANCE *router)
encode_value(&data[14], 0, 32); // Replication rank
encode_value(&data[18], router->masterid, 32); // Master server-id
// This is hack to get the result set processing in order for binlogrouter
MySQLProtocol *proto = (MySQLProtocol*)router->master->protocol;
proto->current_command = MYSQL_COM_REGISTER_SLAVE;
return buf;
}
@ -951,6 +962,11 @@ blr_make_binlog_dump(ROUTER_INSTANCE *router)
router->serverid, 32); // Server-id of MaxScale
memcpy((char *)&data[15], router->binlog_name,
binlog_file_len); // binlog filename
// This is hack to get the result set processing in order for binlogrouter
MySQLProtocol *proto = (MySQLProtocol*)router->master->protocol;
proto->current_command = MYSQL_COM_BINLOG_DUMP;
return buf;
}