From f7ddfa73bf45466effa2a61dbd30f9d4bc92e466 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Lindstr=C3=B6m?= Date: Tue, 23 Jul 2013 07:44:28 +0300 Subject: [PATCH] Fixed some issues on GTID implementation and added support for storing table replication consistency metadata on MySQL server. --- replication_listener/include/binlog_event.h | 4 +- replication_listener/include/gtid.h | 22 +- replication_listener/src/gtid.cpp | 26 +- table_replication_consistency/CMakeLists.txt | 5 +- .../table_replication_consistency.cpp | 14 +- .../table_replication_consistency.h | 23 +- .../table_replication_listener.cpp | 134 ++++- .../table_replication_listener.h | 4 +- .../table_replication_metadata.cpp | 509 ++++++++++++++++++ .../table_replication_metadata.h | 70 +++ 10 files changed, 748 insertions(+), 63 deletions(-) create mode 100644 table_replication_consistency/table_replication_metadata.cpp create mode 100644 table_replication_consistency/table_replication_metadata.h diff --git a/replication_listener/include/binlog_event.h b/replication_listener/include/binlog_event.h index 6e1d6de98..cf699ac9d 100644 --- a/replication_listener/include/binlog_event.h +++ b/replication_listener/include/binlog_event.h @@ -183,10 +183,12 @@ class Gtid_event: public Binary_log_event { public: Gtid_event(Log_event_header *header) : Binary_log_event(header) {} + + size_t gtid_length() { return MYSQL_GTID_ENCODED_SIZE;} boost::uint32_t domain_id; boost::uint32_t server_id; boost::uint64_t sequence_number; - char m_mysql_gtid[MYSQL_GTID_ENCODED_SIZE]; + unsigned char m_mysql_gtid[MYSQL_GTID_ENCODED_SIZE]; Gtid m_gtid; }; diff --git a/replication_listener/include/gtid.h b/replication_listener/include/gtid.h index 044a1d076..fbd0403ba 100644 --- a/replication_listener/include/gtid.h +++ b/replication_listener/include/gtid.h @@ -50,8 +50,9 @@ class Gtid { public: - Gtid() - : m_real_gtid(false), m_domain_id(0), m_server_id(0), m_sequence_number(0), m_server_type(MYSQL_SERVER_TYPE_NA) + Gtid() + : m_real_gtid(false), m_domain_id(0), m_server_id(0), m_sequence_number(0), + m_server_type(MYSQL_SERVER_TYPE_NA), m_gtid_length(0) { memset(m_mysql_gtid, 0, MYSQL_GTID_ENCODED_SIZE); } @@ -60,16 +61,21 @@ class Gtid const boost::uint32_t server_id, const boost::uint64_t sequence_number); - Gtid(const char *mysql_gtid, + Gtid(const unsigned char *mysql_gtid, const boost::uint64_t gno); - Gtid(const char *mysql_gtid); + Gtid(const unsigned char *mysql_gtid); ~Gtid() {} - + bool is_real_gtid() const { return m_real_gtid;} - const char* get_mysql_gtid() const { return m_mysql_gtid; } + const unsigned char* get_mysql_gtid() const { return m_mysql_gtid; } + + const unsigned char* get_gtid() const; + + size_t get_gtid_length() const { return m_gtid_length; } + std::string get_string() const; @@ -85,8 +91,10 @@ class Gtid boost::uint32_t m_domain_id; boost::uint32_t m_server_id; boost::uint64_t m_sequence_number; + boost::uint32_t m_gtid_length; - char m_mysql_gtid[MYSQL_GTID_ENCODED_SIZE]; + unsigned char m_mysql_gtid[MYSQL_GTID_ENCODED_SIZE]; + std::string m_mariadb_gtid; }; } diff --git a/replication_listener/src/gtid.cpp b/replication_listener/src/gtid.cpp index b1a2e4191..e5e1c1516 100644 --- a/replication_listener/src/gtid.cpp +++ b/replication_listener/src/gtid.cpp @@ -42,25 +42,30 @@ namespace mysql m_server_type(MYSQL_SERVER_TYPE_MARIADB) { memset(m_mysql_gtid, 0, MYSQL_GTID_ENCODED_SIZE); + + m_mariadb_gtid = to_string(m_domain_id) + std::string("-") + to_string(m_server_id) + std::string("-") + to_string(m_sequence_number); + m_gtid_length = m_mariadb_gtid.length(); } - Gtid::Gtid(const char *mysql_gtid, + Gtid::Gtid(const unsigned char *mysql_gtid, const boost::uint64_t gno) :m_real_gtid(true), m_domain_id(0), m_server_id(0), m_sequence_number(gno), - m_server_type(MYSQL_SERVER_TYPE_MYSQL) + m_server_type(MYSQL_SERVER_TYPE_MYSQL), + m_gtid_length(MYSQL_GTID_ENCODED_SIZE) { memcpy(m_mysql_gtid, mysql_gtid, MYSQL_GTID_ENCODED_SIZE); } - Gtid::Gtid(const char* mysql_gtid) + Gtid::Gtid(const unsigned char* mysql_gtid) :m_real_gtid(true), m_domain_id(0), m_server_id(0), m_sequence_number(0), - m_server_type(MYSQL_SERVER_TYPE_MYSQL) + m_server_type(MYSQL_SERVER_TYPE_MYSQL), + m_gtid_length(MYSQL_GTID_ENCODED_SIZE) { int i,k; char tmp[2]; @@ -84,10 +89,10 @@ namespace mysql std::string Gtid::get_string() const { if (m_server_type == MYSQL_SERVER_TYPE_MARIADB) { - return (to_string(m_domain_id) + std::string("-") + to_string(m_server_id) + std::string("-") + to_string(m_sequence_number)); + return (m_mariadb_gtid); } else { std::string hexs; - char *sid = (char *)m_mysql_gtid; + unsigned char *sid = (unsigned char *)m_mysql_gtid; char tmp[2]; // Dump the encoded SID using hexadesimal representation @@ -99,6 +104,15 @@ namespace mysql return(hexs + std::string(":") + to_string(m_sequence_number)); } } + + const unsigned char* Gtid::get_gtid() const + { + if (m_server_type == MYSQL_SERVER_TYPE_MARIADB) { + return ((const unsigned char *)m_mariadb_gtid.c_str()); + } else { + return (m_mysql_gtid); + } + } } diff --git a/table_replication_consistency/CMakeLists.txt b/table_replication_consistency/CMakeLists.txt index d308323c4..5183c9e5c 100644 --- a/table_replication_consistency/CMakeLists.txt +++ b/table_replication_consistency/CMakeLists.txt @@ -5,7 +5,10 @@ cmake_minimum_required (VERSION 2.6) # the library. set(table_replication_consistency_sources - table_replication_consistency.cpp table_replication_listener.cpp table_replication_parser.cpp) + table_replication_consistency.cpp + table_replication_listener.cpp + table_replication_parser.cpp + table_replication_metadata.cpp) # ---------- Find Boost Headers/Libraries ----------------------- SET(Boost_DEBUG FALSE) diff --git a/table_replication_consistency/table_replication_consistency.cpp b/table_replication_consistency/table_replication_consistency.cpp index fa6661a28..2b2dae92e 100644 --- a/table_replication_consistency/table_replication_consistency.cpp +++ b/table_replication_consistency/table_replication_consistency.cpp @@ -52,12 +52,12 @@ This function will register replication listener for every server provided and initialize all internal data structures and starts listening the replication stream. @return 0 on success, error code at failure. */ -int +int tb_replication_consistency_init( /*============================*/ replication_listener_t *rpl, /*!< in: Server definition. */ - int n_servers, /*!< in: Number of servers */ + size_t n_servers, /*!< in: Number of servers */ unsigned int gateway_server_id) /*!< in: Gateway slave server id. */ { @@ -118,7 +118,7 @@ tb_replication_consistency_init( error_handling: n_replication_listeners = i; -rpl[i].error_message = (char *)malloc(errmsg.size()+1); + rpl[i].error_message = (char *)malloc(errmsg.size()+1); strcpy(rpl[i].error_message, errmsg.c_str()); return (1); } @@ -130,14 +130,14 @@ status structures. Client must allocate memory for consistency result array and provide the maximum number of values returned. At return there is information how many results where available. @return 0 on success, error code at failure. */ -int +int tb_replication_consistency_query( /*=============================*/ table_consistency_query_t *tb_query, /*!< in: Table consistency - query. */ + query. */ table_consistency_t *tb_consistency, /*!< in: Table consistency status structure.*/ - int *n_servers) /*!< inout: Number of + size_t *n_servers) /*!< inout: Number of servers where to get table consistency status. Out: Number of successfull consistency @@ -258,8 +258,8 @@ the current status on metadata to MySQL server. @return 0 on success, error code at failure. */ int tb_replication_consistency_shutdown( - char ** error_message) /*!< out: error message */ /*================================*/ + char ** error_message) /*!< out: error message */ { int err = 0; boost::uint32_t i = 0; diff --git a/table_replication_consistency/table_replication_consistency.h b/table_replication_consistency/table_replication_consistency.h index cda9be869..28c7e3fed 100644 --- a/table_replication_consistency/table_replication_consistency.h +++ b/table_replication_consistency/table_replication_consistency.h @@ -44,11 +44,12 @@ typedef struct { int use_binlog_pos; /*!< in: 1 if binlog position should be used for binlog start position. */ - char *gtid; /*!< in: Global transaction identifier + unsigned char *gtid; /*!< in: Global transaction identifier or NULL */ + size_t gtid_length; /*!< in: Real size of GTID */ int is_master; /*!< in: Is this server a master 1 = yes, 0 = no. */ - int gateway_slave_server_id; /*!< in: replication listener slave + int gateway_slave_server_id; /*!< in: replication listener slave server id. */ int listener_id; /*!< in: listener id */ int connection_suggesfull; /*!< out: 0 if connection successfull @@ -59,13 +60,13 @@ typedef struct { /* Structure definition for table consistency query */ typedef struct table_consistency_query { - char *db_dot_table; /*!< in: Fully qualified database and + unsigned char *db_dot_table; /*!< in: Fully qualified database and table, e.g. Production.Orders. */ } table_consistency_query_t; /* Structure definition for table consistency result */ typedef struct table_consistency { - char *db_dot_table; /*!< out: Fully qualified database and + unsigned char *db_dot_table;/*!< out: Fully qualified database and table, e.g. Production.Orders. */ unsigned int server_id; /*!< out: Server id where the consitency information is from. */ @@ -75,8 +76,9 @@ typedef struct table_consistency { transaction id is known. */ unsigned long binlog_pos; /*!< out: Last seen binlog position on this server. */ - char *gtid; /*!< out: If global transacition id + unsigned char *gtid; /*!< out: If global transacition id is known, will contain the id or NULL. */ + size_t gtid_length; /*!< out: Real length of GTID */ int error_code; /*!< out: 0 if table consistency query for this server succesfull or error code. */ @@ -85,6 +87,7 @@ typedef struct table_consistency { server failed. */ } table_consistency_t; + EXTERN_C_BLOCK_BEGIN /* Interface functions */ @@ -94,12 +97,12 @@ This function will register replication listener for every server provided and initialize all internal data structures and starts listening the replication stream. @return 0 on success, error code at failure. */ -int +int tb_replication_consistency_init( /*============================*/ replication_listener_t *rpl, /*!< in: Server definition. */ - int n_servers, /*!< in: Number of servers */ + size_t n_servers, /*!< in: Number of servers */ unsigned int gateway_server_id);/*!< in: Gateway slave server id. */ @@ -110,14 +113,14 @@ status structures. Client must allocate memory for consistency result array and provide the maximum number of values returned. At return there is information how many results where available. @return 0 on success, error code at failure. */ -int +int tb_replication_consistency_query( /*=============================*/ table_consistency_query_t *tb_query, /*!< in: Table consistency - query. */ + query. */ table_consistency_t *tb_consistency, /*!< in: Table consistency status structure.*/ - int *n_servers); /*!< inout: Number of + size_t *n_servers); /*!< inout: Number of servers where to get table consistency status. Out: Number of successfull consistency diff --git a/table_replication_consistency/table_replication_listener.cpp b/table_replication_consistency/table_replication_listener.cpp index eef889880..9e10a4782 100644 --- a/table_replication_consistency/table_replication_listener.cpp +++ b/table_replication_consistency/table_replication_listener.cpp @@ -37,6 +37,7 @@ Updated: #include "table_replication_consistency.h" #include "table_replication_listener.h" #include "table_replication_parser.h" +#include "table_replication_metadata.h" using mysql::Binary_log; using mysql::system::create_transport; @@ -49,32 +50,27 @@ namespace mysql { namespace table_replication_listener { -/* Table Consistency data structure */ -typedef struct { - char* database_dot_table; /* Fully qualified db.table name, - primary key. */ - boost::uint32_t server_id; /* Server id */ - char* gtid; /* Global transaction id */ - boost::uint64_t binlog_pos; /* Binlog position */ - bool gtid_known; /* Is gtid known ? */ -} table_listener_consistency_t; - /* STL multimap containing the consistency information. Multimap is used because same table can be found from several servers. */ -multimap table_consistency_map; +multimap table_consistency_map; boost::mutex table_consistency_mutex; /* This mutex is used protect - abve data structure from + above data structure from multiple threads */ -/* We use this map to store constructed binary log connections */ +/* We use this map to store constructed binary log connections */ map table_replication_listeners; boost::mutex table_replication_mutex; /* This mutex is used protect abve data structure from multiple threads */ +bool listener_shutdown = false; /* This flag will be true + at shutdown */ +pthread_t metadata_updater_pid; /* Thread id for metadata + updater*/ + /***********************************************************************//** Internal function to update table consistency information based on log event header, table name and if GTID is known the gtid.*/ @@ -87,7 +83,7 @@ tbrl_update_consistency( Gtid& gtid) /*!< in: gtid */ { bool not_found = true; - table_listener_consistency_t *tc=NULL; + tbr_metadata_t *tc=NULL; // Need to be protected by mutex to avoid concurrency problems boost::mutex::scoped_lock lock(table_consistency_mutex); @@ -96,7 +92,7 @@ tbrl_update_consistency( not_found = true; } else { // Loop through the consistency values - for(multimap::iterator i = table_consistency_map.find(database_dot_table); + for(multimap::iterator i = table_consistency_map.find(database_dot_table); i != table_consistency_map.end(); ++i) { tc = (*i).second; if (tc->server_id == lheader->server_id) { @@ -108,23 +104,25 @@ tbrl_update_consistency( if(not_found) { // Consistency for this table and server not found, insert a record - table_listener_consistency_t* tb_c = (table_listener_consistency_t*) malloc(sizeof(table_listener_consistency_t)); - tb_c->database_dot_table = (char *)malloc(database_dot_table.size()+1); - strcpy(tb_c->database_dot_table, database_dot_table.c_str()); + tbr_metadata_t* tb_c = (tbr_metadata_t*) malloc(sizeof(tbr_metadata_t)); + tb_c->db_table = (unsigned char *)malloc(database_dot_table.size()+1); + strcpy((char *)tb_c->db_table, (char *)database_dot_table.c_str()); tb_c->server_id = lheader->server_id; tb_c->binlog_pos = lheader->next_position; tb_c->gtid_known = gtid_known; - tb_c->gtid = (char *)malloc(gtid.get_string().size()+1); - strcpy(tb_c->gtid, gtid.get_string().c_str()); + tb_c->gtid_len = gtid.get_gtid_length(); + tb_c->gtid = (unsigned char *)malloc(tb_c->gtid_len); + memcpy(tb_c->gtid, gtid.get_gtid(), tb_c->gtid_len); - table_consistency_map.insert(pair(database_dot_table,tb_c)); + table_consistency_map.insert(pair(database_dot_table,tb_c)); } else { // Consistency for this table and server found, update the // consistency values tc->binlog_pos = lheader->next_position; free(tc->gtid); - tc->gtid = (char *)malloc(gtid.get_string().size()+1); - strcpy(tc->gtid, gtid.get_string().c_str()); + tc->gtid_len = gtid.get_gtid_length(); + tc->gtid = (unsigned char *)malloc(tc->gtid_len); + memcpy(tc->gtid, gtid.get_gtid(), tc->gtid_len); tc->gtid_known = gtid_known; } } @@ -245,7 +243,8 @@ void* tb_replication_listener_reader( gtid_known = true; gtid = Gtid(gevent->domain_id, gevent->server_id, gevent->sequence_number); } else { - // TODO MYSQL + gtid_known = true; + gtid = Gtid(gevent->m_mysql_gtid); } std::cout << "Thread: " << id << " server_id " << lheader->server_id @@ -356,6 +355,8 @@ tb_replication_listener_shutdown( boost::mutex::scoped_lock lock(table_replication_mutex); map::iterator b_it; + listener_shutdown = true; + b_it = table_replication_listeners.find(server_id); if ( b_it != table_replication_listeners.end()) { @@ -408,7 +409,7 @@ status structures. Client must allocate memory for consistency result array and provide the maximum number of values returned. At return there is information how many results where available. @return 0 on success, error code at failure. */ -int +int tb_replication_listener_consistency( /*================================*/ const char *db_dot_table, /*!< in: Fully qualified table @@ -418,17 +419,17 @@ tb_replication_listener_consistency( { bool found = false; boost::uint32_t cur_server = 0; - table_listener_consistency_t *tc=NULL; + tbr_metadata_t *tc=NULL; // Need to be protected by mutex to avoid concurrency problems boost::mutex::scoped_lock lock(table_consistency_mutex); // Loop through the consistency values - for(multimap::iterator i = table_consistency_map.find(db_dot_table); + for(multimap::iterator i = table_consistency_map.find(db_dot_table); i != table_consistency_map.end(); ++i, ++cur_server) { if (cur_server == server_no) { tc = (*i).second; - memcpy(tb_consistency, tc, sizeof(table_listener_consistency_t)); + memcpy(tb_consistency, tc, sizeof(tbr_metadata_t)); found = true; break; } @@ -441,7 +442,6 @@ tb_replication_listener_consistency( } } - /***********************************************************************//** This function will reconnect replication listener to a server provided. @@ -546,6 +546,82 @@ err_exit: return (1); } +/***********************************************************************//** +This internal function is executed on its own thread and it will write +table consistency information to the master database in every n seconds +based on configuration. +*/ +void +*tb_replication_listener_metadata_updater( +/*======================================*/ + void *arg) /*!< in: Master definition */ +{ + replication_listener_t *master = (replication_listener_t*)arg; + tbr_metadata_t **tm=NULL; + + char *body = master->server_url; + size_t len = strlen(master->server_url); + + /* Find the beginning of the user name */ + strncmp(body, "//", 2); + + /* Find the user name, which is mandatory */ + const char *user = body + 2; + const char *user_end= strpbrk(user, ":@"); + + /* Find the password, which can be empty */ + assert(*user_end == ':' || *user_end == '@'); + const char *const pass = user_end + 1; // Skip the ':' (or '@') + const char *pass_end = pass; + if (*user_end == ':') + { + pass_end = strchr(pass, '@'); + } + + /* Find the host name, which is mandatory */ + // Skip the '@', if there is one + const char *host = *pass_end == '@' ? pass_end + 1 : pass_end; + const char *host_end = strchr(host, ':'); + /* If no ':' was found there is no port, so the host end at the end + * of the string */ + if (host_end == 0) + host_end = body + len; + /* Find the port number */ + unsigned long portno = 3306; + if (*host_end == ':') + portno = strtoul(host_end + 1, NULL, 10); + + while(listener_shutdown == false) { + sleep(10000); // Sleep ~10 seconds + + { + // Need to be protected by mutex to avoid concurrency problems + boost::mutex::scoped_lock lock(table_consistency_mutex); + + size_t nelems = table_consistency_map.size(); + size_t k =0; + + tm = (tbr_metadata_t**)calloc(nelems, sizeof(tbr_metadata_t*)); + + for(multimap::iterator i = table_consistency_map.begin(); + i != table_consistency_map.end(); ++i,++k) { + + tm[k] = ((*i).second); + + } + + // Release mutex + lock.unlock(); + + // Insert or update metadata information + tbrm_write_metadata(host, user, pass, portno, tm, nelems); + } + } + + pthread_exit(NULL); + return NULL; +} + } // namespace table_replication_listener } // namespace mysql diff --git a/table_replication_consistency/table_replication_listener.h b/table_replication_consistency/table_replication_listener.h index 12e87673a..bb917f90a 100644 --- a/table_replication_consistency/table_replication_listener.h +++ b/table_replication_consistency/table_replication_listener.h @@ -55,7 +55,7 @@ there is information how many results where available. int tb_replication_listener_consistency( /*================================*/ - const char *db_dot_table, /*!< in: Fully qualified table + const unsigned char *db_dot_table, /*!< in: Fully qualified table name. */ table_consistency_t *tb_consistency, /*!< out: Consistency values. */ boost::uint32_t server_no); /*!< in: Server */ @@ -79,7 +79,7 @@ int tb_replication_listener_shutdown( /*=============================*/ boost::uint32_t server_id, /*!< in: server id */ - char **error_message); /*!< out: error message */ + char **error_message); /*!< out: error message */ } // table_replication_listener diff --git a/table_replication_consistency/table_replication_metadata.cpp b/table_replication_consistency/table_replication_metadata.cpp new file mode 100644 index 000000000..720b1784b --- /dev/null +++ b/table_replication_consistency/table_replication_metadata.cpp @@ -0,0 +1,509 @@ +/* +Copyright (C) 2013, SkySQL Ab + + +This file is distributed as part of the SkySQL Gateway. It is free +software: you can redistribute it and/or modify it under the terms of the +GNU General Public License as published by the Free Software Foundation, +version 2. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., 51 +Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +Author: Jan Lindström jan.lindstrom@skysql.com +Created: 15-07-2013 +Updated: +*/ +#include "binlog_api.h" +#include "my_pthread.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "listener_exception.h" +#include +#include +#include "table_replication_metadata.h" + +namespace mysql { + +namespace table_replication_metadata { + +/***********************************************************************//** +*/ +static void +tbrm_report_error( +/*==============*/ + MYSQL *con, + const char *message, + const char *file, + int line) +{ + fprintf(stderr, "%s at file %s line %d\n", message, file, line); + if (con != NULL) { + fprintf(stderr, "%s\n", mysql_error(con)); + mysql_close(con); + } +} + +/***********************************************************************//** +*/ +static void +tbrm_stmt_error( +/*============*/ + MYSQL_STMT *stmt, + const char *message, + const char *file, + int line) +{ + fprintf (stderr, "%s at file %s line %d\n", message, file, line); + if (stmt != NULL) + { + fprintf (stderr, "Error %u (%s): %s\n", + mysql_stmt_errno (stmt), + mysql_stmt_sqlstate (stmt), + mysql_stmt_error (stmt)); + } +} + +/***********************************************************************//** +Inspect master data dictionary and if necessary table replication +consistency metadata is not created, create it. +@return false if create failed, true if metadata already created or +create succeeded */ +static bool +tbrm_create_metadata( +/*=================*/ + const char *master_host, + const char *user, + const char *passwd, + unsigned int master_port) +{ + MYSQL *con = mysql_init(NULL); + unsigned int myerrno=0; + + if (!con) { + // TODO: start to log error and other messages + return false; + } + + if (mysql_real_connect(con, master_host, user, passwd, "mysql", mysql_port, NULL, 0) == NULL) { + tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__); + goto error_exit; + } + + // Check is the database there + mysql_query(con, "USE SKYSQL_GATEWAY_METADATA"); + myerrno = mysql_errno(con); + + if (myerrno != 0 && myerrno != ER_NO_DB_ERROR) { + tbrm_report_error(con, "Error: mysql_query(USE_SKYSQL_GATEWAY_METADATA) failed", __FILE__, __LINE__); + goto error_exit; + } else if (myerrno == 0) { + // Database found, assuming everyting ok + return true; + } + + // Create databse + mysql_query(con, "CREATE DATABASE SKYSQL_GATEWAY_METADATA"); + + if (mysql_errno(con) != 0) { + tbrm_report_error(con, "mysql_query(CREATE DATABASE SKYSQL_GATEWAY_METADATA) failed", __FILE__, __LINE__); + goto error_exit; + } + + // Set correct database + mysql_query(con, "USE SKYSQL_GATEWAY_METADATA"); + + if (mysql_errno(con) != 0) { + tbrm_report_error(con, "Error: mysql_query(USE_SKYSQL_GATEWAY_METADATA) failed", __FILE__, __LINE__); + goto error_exit; + } + + // Create table + mysql_query(con, "CREATE TABLE IF NOT EXISTS TABLE_REPLICATION_CONSISTENCY(" + "DB_TABLE_NAME VARCHAR(255) NOT NULL," + "SERVER_ID INT NOT NULL," + "GTID VARBINARY(255)," + "BINLOG_POS BIGINT NOT NULL," + "GTID_KNOWN INT," + "PRIMARY KEY(DB_TABLE_NAME, SERVER_ID)) ENGINE=InnoDB"); + + if (mysql_errno(con) != 0) { + tbrm_report_error(con, "Error: Create table failed", __FILE__, __LINE__); + goto error_exit; + } + + // Above clauses not really transactional, but lets play safe + mysql_query(con, "COMMIT"); + + if (mysql_errno(con) != 0) { + tbrm_report_error(con, "Error: Commit failed", __FILE__, __LINE__); + goto error_exit; + } + + mysql_close(con); + + // Done + return true; + +error_exit: + + if (con) { + mysql_close(con); + } + + return false; +} + +/***********************************************************************//** +Read table replication consistency metadata from the MySQL master server. +This function will create necessary database and table if they are not +yet created. +@return false if read failed, true if read succeeded */ +bool +tbrm_read_metadata( +/*===============*/ + const char *master_host, /*!< in: Master hostname */ + const char *user, /*!< in: username */ + const char *passwd, /*!< in: password */ + unsigned int master_port, /*!< in: master port */ + tbr_metadata_t **tbrm_meta, /*!< out: table replication consistency + metadata. */ + unsigned int *tbrm_rows) /*!< out: number of rows read */ +{ + MYSQL *con = mysql_init(NULL); + unsigned int myerrno=0; + boost::uint64_t nrows=0; + boost::uint64_t i=0; + MYSQL_RES *result = NULL; + + tbrm_create_metadata(master_host, user, passwd, master_port); + + if (!con) { + // TODO: start to log error and other messages + return false; + } + + if (mysql_real_connect(con, master_host, user, passwd, "mysql", mysql_port, NULL, 0) == NULL) { + tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__); + goto error_exit; + } + + mysql_query(con, "USE SKYSQL_GATEWAY_METADATA"); + myerrno = mysql_errno(con); + + if (myerrno != 0) { + tbrm_report_error(con, "Error: Database set failed", __FILE__, __LINE__); + goto error_exit; + } + + mysql_query(con, "SELECT * FROM TABLE_REPLICATION_CONSISTENCY"); + myerrno = mysql_errno(con); + + if (myerrno != 0) { + tbrm_report_error(con,"Error: Select from table_replication_consistency failed", __FILE__, __LINE__); + goto error_exit; + } + + result = mysql_store_result(con); + + if (!result) { + tbrm_report_error(con, "Error: mysql_store_result failed", __FILE__, __LINE__); + goto error_exit; + } + + nrows = mysql_num_rows(result); + + *tbrm_meta = (tbr_metadata_t*) calloc(nrows, sizeof(tbr_metadata_t)); + *tbrm_rows = nrows; + + for(i=0;i < nrows; i++) { + MYSQL_ROW row = mysql_fetch_row(result); + unsigned long *lengths = mysql_fetch_lengths(result); + // DB_TABLE_NAME + tbrm_meta[i]->db_table = (unsigned char *)malloc(lengths[0]); + strcpy((char *)tbrm_meta[i]->db_table, row[0]); + // SERVER_ID + tbrm_meta[i]->server_id = atol(row[1]); + // GTID + tbrm_meta[i]->gtid = (unsigned char *)malloc((lengths[2])*sizeof(unsigned char)); + memcpy(tbrm_meta[i]->gtid, row[2], lengths[2]); + tbrm_meta[i]->gtid_len = lengths[2]; + // BINLOG_POS + tbrm_meta[i]->binlog_pos = atoll(row[3]); + // GTID_KNOWN + tbrm_meta[i]->gtid_known = atol(row[4]); + } + + mysql_free_result(result); + mysql_close(con); + + return true; + + error_exit: + + if (result) { + mysql_free_result(result); + } + + if (con) { + mysql_close(con); + } + + return false; +} + + +/***********************************************************************//** +Write table replication consistency metadata from the MySQL master server. +This function assumes that necessary database and table are created. +@return false if read failed, true if read succeeded */ +bool +tbrm_write_metadata( +/*================*/ + const char *master_host, /*!< in: Master hostname */ + const char *user, /*!< in: username */ + const char *passwd, /*!< in: password */ + unsigned int master_port, /*!< in: master port */ + tbr_metadata_t **tbrm_meta, /*!< in: table replication consistency + metadata. */ + boost::uint32_t tbrm_rows) /*!< in: number of rows read */ +{ + MYSQL *con = mysql_init(NULL); + int myerrno=0; + boost::uint32_t i; + MYSQL_STMT *sstmt=NULL; + MYSQL_STMT *istmt=NULL; + MYSQL_STMT *ustmt=NULL; + MYSQL_BIND sparam[2]; + MYSQL_BIND iparam[5]; + MYSQL_BIND uparam[5]; + MYSQL_BIND result[1]; + char *dbtable; + void *gtid; + int gtidknown; + int serverid; + boost::uint64_t binlogpos; + + // Query to find out if the row already exists on table + const char *sst = "SELECT BINLOG_POS FROM TABLE_REPLICATION_CONSISTENCY WHERE" + " DB_TABLE_NAME='?' and SERVER_ID=?"; + + // Insert Query + const char *ist = "INSERT INTO TABLE_REPLICATION_CONSISTENCY(DB_TABLE_NAME," + " SERVER_ID, GTID, BINLOG_POS, GTID_KNOWN) VALUES" + "(?, ?, ?, ?, ?)"; + + // Update Query + const char *ust = "UPDATE TABLE_REPLICATION_CONSISTENCY " + "SET GTID=?, BINLOG_POS=?, GTID_KNOWN=?" + " WHERE DB_TABLE_NAME=? AND SERVER_ID=?"; + + if (!con) { + // TODO: start to log error and other messages + return false; + } + + if (mysql_real_connect(con, master_host, user, passwd, "mysql", mysql_port, NULL, 0) == NULL) { + tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__); + } + + mysql_query(con, "USE SKYSQL_GATEWAY_METADATA"); + myerrno = mysql_errno(con); + + if (myerrno != 0) { + tbrm_report_error(con, "Error: Database set failed", __FILE__, __LINE__); + } + + // Allocate statement handlers + sstmt = mysql_stmt_init(con); + istmt = mysql_stmt_init(con); + ustmt = mysql_stmt_init(con); + + if (sstmt == NULL || istmt == NULL || ustmt == NULL) { + tbrm_report_error(con, "Could not initialize statement handler", __FILE__, __LINE__); + goto error_exit; + } + + // Prepare the statements + if (mysql_stmt_prepare(sstmt, sst, strlen(sst)) != 0) { + tbrm_stmt_error(sstmt, "Error: Could not prepare select statement", __FILE__, __LINE__); + goto error_exit; + } + if (mysql_stmt_prepare(istmt, ist, strlen(ist)) != 0) { + tbrm_stmt_error(istmt, "Error: Could not prepare insert statement", __FILE__, __LINE__); + goto error_exit; + } + if (mysql_stmt_prepare(ustmt, ust, strlen(ust)) != 0) { + tbrm_stmt_error(ustmt, "Error: Could not prepare update statement", __FILE__, __LINE__); + goto error_exit; + } + + // Initialize the parameters + memset (sparam, 0, sizeof (sparam)); + memset (iparam, 0, sizeof (iparam)); + memset (uparam, 0, sizeof (uparam)); + memset (result, 0, sizeof (result)); + + // Init param structure + // Select + sparam[0].buffer_type = MYSQL_TYPE_VARCHAR; + sparam[0].buffer = (void *) dbtable; + sparam[1].buffer_type = MYSQL_TYPE_LONG; + sparam[1].buffer = (void *) &serverid; + // Insert + iparam[0].buffer_type = MYSQL_TYPE_VARCHAR; + iparam[0].buffer = (void *) dbtable; + iparam[1].buffer_type = MYSQL_TYPE_LONG; + iparam[1].buffer = (void *) &serverid; + iparam[2].buffer_type = MYSQL_TYPE_BLOB; + iparam[2].buffer = (void *) gtid; + iparam[3].buffer_type = MYSQL_TYPE_LONGLONG; + iparam[3].buffer = (void *) &binlogpos; + iparam[4].buffer_type = MYSQL_TYPE_SHORT; + iparam[4].buffer = (void *) >idknown; + // Update + uparam[0].buffer_type = MYSQL_TYPE_BLOB; + uparam[0].buffer = (void *) gtid; + uparam[1].buffer_type = MYSQL_TYPE_LONGLONG; + uparam[1].buffer = (void *) &binlogpos; + uparam[2].buffer_type = MYSQL_TYPE_SHORT; + uparam[2].buffer = (void *) >idknown; + uparam[3].buffer_type = MYSQL_TYPE_VARCHAR; + uparam[3].buffer = (void *) dbtable; + uparam[4].buffer_type = MYSQL_TYPE_LONG; + uparam[4].buffer = (void *) &serverid; + // Result set for select + result[0].buffer_type = MYSQL_TYPE_LONGLONG; + result[0].buffer = &binlogpos; + + + // Iterate through the data + for(i = 0; i < tbrm_rows; i++) { + // Start from Select, we need to know if the consistency + // information for this table, server pair is already + // in metadata or not. + + dbtable = (char *)tbrm_meta[i]->db_table; + gtid = (char *)tbrm_meta[i]->gtid; + gtidknown = tbrm_meta[i]->gtid_known; + binlogpos = tbrm_meta[i]->binlog_pos; + serverid = tbrm_meta[i]->server_id; + + sparam[0].buffer_length = strlen(dbtable); + uparam[3].buffer_length = sparam[0].buffer_length; + iparam[0].buffer_length = sparam[0].buffer_length; + uparam[0].buffer_length = tbrm_meta[i]->gtid_len; + iparam[2].buffer_length = tbrm_meta[i]->gtid_len; + + // Bind param structure to statement + if (mysql_stmt_bind_param(sstmt, sparam) != 0) { + tbrm_stmt_error(sstmt, "Error: Could not bind select parameters", __FILE__, __LINE__); + goto error_exit; + } + + // Bind result structure to statement + if (mysql_stmt_bind_result(sstmt, result) != 0) { + tbrm_stmt_error(sstmt, "Error: Could not bind select return parameters", __FILE__, __LINE__); + goto error_exit; + } + + // Execute!! + if (mysql_stmt_execute(sstmt) != 0) { + tbrm_stmt_error(sstmt, "Error: Could not execute select statement", __FILE__, __LINE__); + goto error_exit; + } + + // Store result + if (mysql_stmt_store_result(sstmt) != 0) { + tbrm_stmt_error(sstmt, "Error: Could not buffer result set", __FILE__, __LINE__); + goto error_exit; + } + + // Fetch result + myerrno = mysql_stmt_fetch(sstmt); + if (myerrno == 1) { + tbrm_stmt_error(sstmt, "Error: Could not fetch result set", __FILE__, __LINE__); + goto error_exit; + } + + // If fetch returned 0, it means that this table, serverid + // pair was found from metadata, we might need to update + // the consistency information. + if (myerrno == 0 && binlogpos != tbrm_meta[i]->binlog_pos) { + // Update the consistency information + binlogpos = tbrm_meta[i]->binlog_pos; + + // Bind param structure to statement + if (mysql_stmt_bind_param(ustmt, uparam) != 0) { + tbrm_stmt_error(ustmt, "Error: Could not bind update parameters", __FILE__, __LINE__); + goto error_exit; + } + // Execute!! + if (mysql_stmt_execute(ustmt) != 0) { + tbrm_stmt_error(ustmt, "Error: Could not execute update statement", __FILE__, __LINE__); + goto error_exit; + } + } else { + // Insert the consistency information + binlogpos = tbrm_meta[i]->binlog_pos; + // Bind param structure to statement + if (mysql_stmt_bind_param(istmt, iparam) != 0) { + tbrm_stmt_error(istmt, "Error: Could not bind insert parameters", __FILE__, __LINE__); + goto error_exit; + } + + // Execute!! + if (mysql_stmt_execute(istmt) != 0) { + tbrm_stmt_error(istmt, "Error: Could not execute insert statement", __FILE__, __LINE__); + goto error_exit; + } + } + } + + return true; + + error_exit: + // Cleanup + if (sstmt) { + if (mysql_stmt_close(sstmt)) { + tbrm_stmt_error(sstmt, "Error: Could not close select statement", __FILE__, __LINE__); + } + } + + if (istmt) { + if (mysql_stmt_close(istmt)) { + tbrm_stmt_error(istmt, "Error: Could not close select statement", __FILE__, __LINE__); + } + } + + if (ustmt) { + if (mysql_stmt_close(ustmt)) { + tbrm_stmt_error(ustmt, "Error: Could not close select statement", __FILE__, __LINE__); + } + } + + if (con) { + mysql_close(con); + } + + return false; + +} + +} // table_replication_metadata + +} // mysql diff --git a/table_replication_consistency/table_replication_metadata.h b/table_replication_consistency/table_replication_metadata.h new file mode 100644 index 000000000..cced1782e --- /dev/null +++ b/table_replication_consistency/table_replication_metadata.h @@ -0,0 +1,70 @@ +/* +Copyright (C) 2013, SkySQL Ab + + +This file is distributed as part of the SkySQL Gateway. It is free +software: you can redistribute it and/or modify it under the terms of the +GNU General Public License as published by the Free Software Foundation, +version 2. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., 51 +Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +Author: Jan Lindström jan.lindstrom@skysql.com +Created: 15-07-2013 +Updated: +*/ + +#ifndef TABLE_REPLICATION_METADATA_H +#define TABLE_REPLICATION_METADATA_H + +/* Structure definition for table replication oconsistency metadata */ +typedef struct { + unsigned char* db_table; /* Fully qualified db.table name, + primary key. */ + boost::uint32_t server_id; /* Server id */ + unsigned char* gtid; /* Global transaction id */ + boost::uint32_t gtid_len; /* Length of gtid */ + boost::uint64_t binlog_pos; /* Binlog position */ + bool gtid_known; /* Is gtid known ? */ +} tbr_metadata_t; + +/***********************************************************************//** +Read table replication consistency metadata from the MySQL master server. +This function assumes that necessary database and table are created. +@return false if read failed, true if read succeeded */ +bool +tbrm_read_metadata( +/*===============*/ + const char *master_host, /*!< in: Master hostname */ + const char *user, /*!< in: username */ + const char *passwd, /*!< in: password */ + unsigned int master_port, /*!< in: master port */ + tbr_metadata_t **tbrm_meta, /*!< out: table replication consistency + metadata. */ + size_t *tbrm_rows); /*!< out: number of rows read */ + +/***********************************************************************//** +Write table replication consistency metadata from the MySQL master server. +This function assumes that necessary database and table are created. +@return false if read failed, true if read succeeded */ +bool +tbrm_write_metadata( +/*================*/ + const char *master_host, /*!< in: Master hostname */ + const char *user, /*!< in: username */ + const char *passwd, /*!< in: password */ + unsigned int master_port, /*!< in: master port */ + tbr_metadata_t **tbrm_meta, /*!< in: table replication consistency + metadata. */ + size_t tbrm_rows); /*!< in: number of rows read */ + +#endif + +