Fixes to MySQL GTID handling. Still can't execute COM_BINLOG_DUMP_GTID.

This commit is contained in:
Jan Lindström
2013-07-12 12:30:24 +03:00
parent 59b0eee999
commit 427fee618a
10 changed files with 204 additions and 251 deletions

View File

@ -39,181 +39,181 @@ static char* server_groups[] = {
void* binlog_reader(void * arg)
{
replication_listener_t *rlt = (replication_listener_t*)arg;
char *uri = rlt->server_url;
map<int, string> tid2tname;
map<int, string>::iterator tb_it;
pthread_t id = pthread_self();
string database_dot_table;
const char* server_type;
Gtid gtid(std::string("62cda1d0e3a011e289d76ac0855a31e8:54"));
try {
Binary_log binlog(create_transport(uri));
binlog.connect(gtid);
replication_listener_t *rlt = (replication_listener_t*)arg;
char *uri = rlt->server_url;
map<int, string> tid2tname;
map<int, string>::iterator tb_it;
pthread_t id = pthread_self();
string database_dot_table;
const char* server_type;
Gtid gtid("62cda1d0e3a011e289d76ac0855a31e8:10");
server_type = binlog.get_mysql_server_type_str();
try {
Binary_log binlog(create_transport(uri));
binlog.connect(gtid);
cout << "Server " << uri << " type: " << server_type << endl;
server_type = binlog.get_mysql_server_type_str();
Binary_log_event *event;
cout << "Server " << uri << " type: " << server_type << endl;
while (true) {
Log_event_header *lheader;
Binary_log_event *event;
int result = binlog.wait_for_next_event(&event);
while (true) {
Log_event_header *lheader;
if (result == ERR_EOF)
break;
int result = binlog.wait_for_next_event(&event);
lheader = event->header();
if (result == ERR_EOF)
break;
switch(event->get_event_type()) {
lheader = event->header();
case QUERY_EVENT: {
Query_event *qevent = dynamic_cast<Query_event *>(event);
switch(event->get_event_type()) {
std::cout << "Thread: " << id << " server_id " << lheader->server_id
<< " position " << lheader->next_position << " : Found event of type "
<< event->get_event_type()
<< " txt " << get_event_type_str(event->get_event_type())
<< " query " << qevent->query << " db " << qevent->db_name
<< std::endl;
break;
}
case QUERY_EVENT: {
Query_event *qevent = dynamic_cast<Query_event *>(event);
case GTID_EVENT_MARIADB:
case GTID_EVENT_MYSQL:
{
Gtid_event *gevent = dynamic_cast<Gtid_event *>(event);
std::cout << "Thread: " << id << " server_id " << lheader->server_id
<< " position " << lheader->next_position << " : Found event of type "
<< event->get_event_type()
<< " txt " << get_event_type_str(event->get_event_type())
<< " query " << qevent->query << " db " << qevent->db_name
<< std::endl;
break;
}
std::cout << "Thread: " << id << " server_id " << lheader->server_id
<< " position " << lheader->next_position << " : Found event of type "
<< event->get_event_type()
<< " txt " << get_event_type_str(event->get_event_type())
<< " GTID " << gevent->m_gtid.get_string()
<< " GTID2" << gevent->m_gtid.get_mysql_gtid()
<< std::endl;
case GTID_EVENT_MARIADB:
case GTID_EVENT_MYSQL: {
Gtid_event *gevent = dynamic_cast<Gtid_event *>(event);
break;
std::cout << "Thread: " << id << " server_id " << lheader->server_id
<< " position " << lheader->next_position << " : Found event of type "
<< event->get_event_type()
<< " txt " << get_event_type_str(event->get_event_type())
<< " GTID " << std::string(gevent->m_gtid.get_mysql_gtid())
<< " GTID " << gevent->m_gtid.get_string()
<< std::endl;
}
break;
}
case TABLE_MAP_EVENT: {
Table_map_event *table_map_event= dynamic_cast<Table_map_event*>(event);
database_dot_table= table_map_event->db_name;
database_dot_table.append(".");
database_dot_table.append(table_map_event->table_name);
tid2tname[table_map_event->table_id]= database_dot_table;
break;
}
case TABLE_MAP_EVENT: {
Table_map_event *table_map_event= dynamic_cast<Table_map_event*>(event);
database_dot_table= table_map_event->db_name;
database_dot_table.append(".");
database_dot_table.append(table_map_event->table_name);
tid2tname[table_map_event->table_id]= database_dot_table;
break;
}
case WRITE_ROWS_EVENT:
case UPDATE_ROWS_EVENT:
case DELETE_ROWS_EVENT:
{
Row_event *revent = dynamic_cast<Row_event*>(event);
tb_it= tid2tname.begin();
tb_it= tid2tname.find(revent->table_id);
if (tb_it != tid2tname.end())
case WRITE_ROWS_EVENT:
case UPDATE_ROWS_EVENT:
case DELETE_ROWS_EVENT: {
Row_event *revent = dynamic_cast<Row_event*>(event);
tb_it= tid2tname.begin();
tb_it= tid2tname.find(revent->table_id);
if (tb_it != tid2tname.end())
{
database_dot_table= tb_it->second;
}
std::cout << "Thread: " << id << " server_id " << lheader->server_id
<< " position " << lheader->next_position << " : Found event of type "
<< event->get_event_type()
<< " txt " << get_event_type_str(event->get_event_type())
<< " table " << revent->table_id
<< " tb " << database_dot_table
<< std::endl;
break;
}
default:
break;
} // switch
} // while
} // try
catch(ListenerException e)
{
database_dot_table= tb_it->second;
std::cerr << "Listener exception: " << e.what() << std::endl;
}
catch(boost::system::error_code e)
{
std::cerr << "Listener system error: " << e.message() << std::endl;
}
// Try and catch all exceptions
catch(std::exception const& e)
{
std::cerr << "Listener other error: " << e.what() << std::endl;
}
// Rest of them
catch(...)
{
std::cerr << "Unknown exception: " << std::endl;
// Re-Throw this one.
// It was not handled so you want to make sure it is handled correctly by
// the OS. So just allow the exception to keep propagating.
throw;
}
std::cout << "Thread: " << id << " server_id " << lheader->server_id
<< " position " << lheader->next_position << " : Found event of type "
<< event->get_event_type()
<< " txt " << get_event_type_str(event->get_event_type())
<< " table " << revent->table_id
<< " tb " << database_dot_table
<< std::endl;
break;
}
default:
break;
} // switch
} // while
} // try
catch(ListenerException e)
{
std::cerr << "Listener exception: " << e.what() << std::endl;
}
catch(boost::system::error_code e)
{
std::cerr << "Listener system error: " << e.message() << std::endl;
}
// Try and catch all exceptions
catch(std::exception const& e)
{
std::cerr << "Listener other error: " << e.what() << std::endl;
}
// Rest of them
catch(...)
{
std::cerr << "Unknown exception: " << std::endl;
// Re-Throw this one.
// It was not handled so you want to make sure it is handled correctly by
// the OS. So just allow the exception to keep propagating.
throw;
}
pthread_exit(NULL);
return NULL;
pthread_exit(NULL);
return NULL;
}
int main(int argc, char** argv) {
int number_of_args = argc;
int i=0,k=0;
pthread_t *tid=NULL;
char *uri;
replication_listener_t *mrl;
int err=0;
int number_of_args = argc;
int i=0,k=0;
pthread_t *tid=NULL;
char *uri;
replication_listener_t *mrl;
int err=0;
tid = (pthread_t*)malloc(sizeof(pthread_t) * argc);
mrl = (replication_listener_t*)calloc(argc, sizeof(replication_listener_t));
tid = (pthread_t*)malloc(sizeof(pthread_t) * argc);
mrl = (replication_listener_t*)calloc(argc, sizeof(replication_listener_t));
if (argc < 2) {
std::cerr << "Usage: basic-2 <uri>" << std::endl;
exit(2);
}
if (argc < 2) {
std::cerr << "Usage: basic-2 <uri>" << std::endl;
exit(2);
}
mysql_server_init(num_elements, server_options, server_groups);
if (mysql_server_init(num_elements, server_options, server_groups)) {
std::cerr << "Failed to init MySQL server" << std::endl;
exit(1);
}
argc =0;
while(argc != number_of_args)
{
uri= argv[argc++];
argc =0;
while(argc != number_of_args)
{
uri= argv[argc++];
if ( strncmp("mysql://", uri, 8) == 0)
{
if ( strncmp("mysql://", uri, 8) == 0) {
mrl[i].server_url = uri;
mrl[i].server_url = uri;
if (argc == 1) {
mrl[i].is_master = 1;
}
if (argc == 1) {
mrl[i].is_master = 1;
}
err = pthread_create(&(tid[i++]), NULL, &binlog_reader, (void *)&mrl[i]);
err = pthread_create(&(tid[i++]), NULL, &binlog_reader, (void *)&mrl[i]);
if (err ) {
perror(NULL);
break;
}
if (err ) {
perror(NULL);
break;
}
}
}//end of outer while loop
}
}//end of outer while loop
for(k=0; k < i; k++)
{
err = pthread_join(tid[k], (void **)&(mrl[k]));
for(k=0; k < i; k++)
{
err = pthread_join(tid[k], (void **)&(mrl[k]));
if (err) {
perror(NULL);
}
}
if (err) {
perror(NULL);
}
}
exit(0);
exit(0);
}