diff --git a/replication_listener/examples/CMakeLists.txt b/replication_listener/examples/CMakeLists.txt index b3df902f2..e43df097f 100644 --- a/replication_listener/examples/CMakeLists.txt +++ b/replication_listener/examples/CMakeLists.txt @@ -5,18 +5,18 @@ link_directories(${PROJECT_BUILD_DIR}/lib) include_directories(${PROJECT_BUILD_DIR}/include) # Find MySQL client library and header files -find_library(MySQL_LIBRARY NAMES libmysqld.a PATHS -/usr/lib64/mysql /usr/lib/mysql /usr/local/mysql/lib) find_path(MySQL_INCLUDE_DIR mysql.h /usr/local/include/mysql /usr/include/mysql) include_directories(${MySQL_INCLUDE_DIR}) +include_directories(../../table_replication_consistency) +include_directories(../../utils) # Create build rules for all the simple examples that only require a # single file. foreach(prog basic-1 basic-2 jan_test) - ADD_EXECUTABLE(${prog} ${prog}.cpp) - TARGET_LINK_LIBRARIES(${prog} replication boost_system pthread aio crypt ${MySQL_LIBRARY}) + ADD_EXECUTABLE(${prog} ${prog}.cpp /usr/local/mysql/lib/libmysqld.a) + TARGET_LINK_LIBRARIES(${prog} replication boost_system boost_thread pthread aio crypt ${MySQL_LIBRARY}) endforeach() add_subdirectory(mysql2lucene EXCLUDE_FROM_ALL) diff --git a/replication_listener/examples/jan_test.cpp b/replication_listener/examples/jan_test.cpp index 21395d1e2..f4dde96eb 100644 --- a/replication_listener/examples/jan_test.cpp +++ b/replication_listener/examples/jan_test.cpp @@ -20,7 +20,7 @@ using namespace std; using namespace mysql::system; static char* server_options[] = { - "jan test", + "jan_test", "--datadir=/tmp/", "--skip-innodb", "--default-storage-engine=myisam", @@ -30,11 +30,10 @@ static char* server_options[] = { const int num_elements = (sizeof(server_options) / sizeof(char *)) - 1; static char* server_groups[] = { - "embedded", - "server", - "server", - "server", - NULL + "libmysqld_server", + "libmysqld_client", + "libmysqld_server", + "libmysqld_server", NULL }; void* binlog_reader(void * arg) @@ -46,11 +45,11 @@ void* binlog_reader(void * arg) pthread_t id = pthread_self(); string database_dot_table; const char* server_type; - Gtid gtid("62cda1d0e3a011e289d76ac0855a31e8:10"); + Gtid gtid(); try { Binary_log binlog(create_transport(uri)); - binlog.connect(gtid); + binlog.connect(); server_type = binlog.get_mysql_server_type_str(); @@ -90,7 +89,7 @@ void* binlog_reader(void * arg) << " position " << lheader->next_position << " : Found event of type " << event->get_event_type() << " txt " << get_event_type_str(event->get_event_type()) - << " GTID " << std::string(gevent->m_gtid.get_mysql_gtid()) + << " GTID " << std::string((char *)gevent->m_gtid.get_gtid()) << " GTID " << gevent->m_gtid.get_string() << std::endl; @@ -177,7 +176,7 @@ int main(int argc, char** argv) { exit(2); } - if (mysql_server_init(num_elements, server_options, server_groups)) { + if (mysql_library_init(num_elements, server_options, server_groups)) { std::cerr << "Failed to init MySQL server" << std::endl; exit(1); } diff --git a/replication_listener/src/tcp_driver.cpp b/replication_listener/src/tcp_driver.cpp index b69d332b0..caeabbc2d 100644 --- a/replication_listener/src/tcp_driver.cpp +++ b/replication_listener/src/tcp_driver.cpp @@ -239,7 +239,8 @@ int Binlog_tcp_driver::fetch_server_version(const std::string& user, throw(ListenerException(std::string("mysql_init() failed"), __FILE__, __LINE__)); } - mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "client"); + + mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "libmysqld_client"); mysql_options(mysql, MYSQL_OPT_RECONNECT, &reconnect); mysql_options(mysql, MYSQL_OPT_USE_REMOTE_CONNECTION, NULL); diff --git a/table_replication_consistency/table_replication_consistency.cpp b/table_replication_consistency/table_replication_consistency.cpp index 90ad45429..32f72b6e4 100644 --- a/table_replication_consistency/table_replication_consistency.cpp +++ b/table_replication_consistency/table_replication_consistency.cpp @@ -111,6 +111,25 @@ tb_replication_consistency_init( tbr_trace = true; } + // Find out the master server + for(i=0;i < n_servers; i++) { + if (rpl[i].is_master) { + break; + } + } + + // If master is found read metadata from MySQL server, if not report error + if (i < n_servers) { + char *errm = NULL; + if(!tb_replication_listener_init(&(rpl[i]), &errm)) { + errmsg = std::string(errm); + free(errm); + } + } else { + errmsg = string("Master server is missing from configuration"); + goto error_handling; + } + // Start replication stream reader thread for every server in the configuration for(i=0;i < n_servers; i++) { // We need to try catch all exceptions here because function @@ -135,6 +154,7 @@ tb_replication_consistency_init( } // Start actual replication listener + /* err = pthread_create( &replication_listener_tid[i], NULL, @@ -145,6 +165,7 @@ tb_replication_consistency_init( errmsg = string(strerror(err)); goto error_handling; } + */ } // Replication listener will use this exception for diff --git a/table_replication_consistency/table_replication_listener.cpp b/table_replication_consistency/table_replication_listener.cpp index 43f0ea88f..5c7f5d8e2 100644 --- a/table_replication_consistency/table_replication_listener.cpp +++ b/table_replication_consistency/table_replication_listener.cpp @@ -57,25 +57,31 @@ namespace table_replication_listener { because same table can be found from several servers. */ multimap table_consistency_map; -boost::mutex table_consistency_mutex; /* This mutex is used protect +boost::mutex table_consistency_mutex; /* This mutex is used to protect above data structure from multiple threads */ /* We use this map to store constructed binary log connections */ map table_replication_listeners; -boost::mutex table_replication_mutex; /* This mutex is used protect - abve data structure from +boost::mutex table_replication_mutex; /* This mutex is used to protect + above data structure from multiple threads */ +/* We use this map to store table consistency server metadata */ +map table_replication_servers; + +boost::mutex table_servers_mutex; /* This mutex is used to proted + above data structure from multiple + threads */ + replication_listener_t *master; /* Master server definition */ /* Master connect info */ -const char *master_user=NULL; -const char *master_passwd=NULL; -const char *master_host=NULL; -unsigned long master_port=3306; - +char *master_user=NULL; +char *master_passwd=NULL; +char *master_host=NULL; +unsigned long master_port=3307; /***********************************************************************//** Internal function to extract user, passwd, hostname and port from @@ -87,33 +93,50 @@ tbrl_extract_master_connect_info() char *body = master->server_url; size_t len = strlen(master->server_url); - /* Find the beginning of the user name */ - strncmp(body, "//", 2); + /* Find the user name, which is mandatory */ + const char *user = body + 8; - /* Find the user name, which is mandatory */ - master_user = body + 2; - const char *user_end= strpbrk(master_user, ":@"); + const char *user_end= strpbrk(user, ":@"); - /* Find the password, which can be empty */ - assert(*user_end == ':' || *user_end == '@'); - master_passwd = user_end + 1; // Skip the ':' (or '@') - const char *pass_end = master_passwd; - if (*user_end == ':') - { - pass_end = strchr(master_passwd, '@'); - } + assert(user_end - user >= 1); // There has to be a username - /* Find the host name, which is mandatory */ - // Skip the '@', if there is one - master_host = *pass_end == '@' ? pass_end + 1 : pass_end; - const char *host_end = strchr(master_host, ':'); - /* If no ':' was found there is no port, so the host end at the end - * of the string */ - if (host_end == 0) - host_end = body + len; - /* Find the port number */ - if (*host_end == ':') - master_port = strtoul(host_end + 1, NULL, 10); + /* Find the password, which can be empty */ + assert(*user_end == ':' || *user_end == '@'); + const char *const pass = user_end + 1; // Skip the ':' (or '@') + const char *pass_end = pass; + if (*user_end == ':') + { + pass_end = strchr(pass, '@'); + } + assert(pass_end - pass >= 0); // Password can be empty + + /* Find the host name, which is mandatory */ + // Skip the '@', if there is one + const char *host = *pass_end == '@' ? pass_end + 1 : pass_end; + const char *host_end = strchr(host, ':'); + if (host == host_end) + /* If no ':' was found there is no port, so the host end at the end + * of the string */ + if (host_end == 0) + host_end = body + len; + assert(host_end - host >= 1); // There has to be a host + + /* Find the port number */ + unsigned long portno = 3307; + if (*host_end == ':') + portno = strtoul(host_end + 1, NULL, 10); + + std::string u(user, user_end - user); + std::string p(pass, pass_end - pass); + std::string h(host, host_end - host); + + master_user = (char *)malloc(u.length()+1); + master_passwd = (char *)malloc(p.length()+1); + master_host = (char *)malloc(h.length()+1); + strcpy(master_user, u.c_str()); + strcpy(master_passwd, p.c_str()); + strcpy(master_host, h.c_str()); + master_port = portno; } /***********************************************************************//** @@ -180,6 +203,58 @@ tbrl_update_consistency( } +/***********************************************************************//** +Internal function to update table replication consistency server status +based on log event header and gtid if known*/ +static void +tbrl_update_server_status( +/*======================*/ + Log_event_header *lheader, /*!< in: Log event header */ + bool gtid_known, /*!< in: is GTID known */ + Gtid& gtid) /*!< in: gtid */ +{ + bool not_found = true; + tbr_server_t *ts=NULL; + + // Need to be protected by mutex to avoid concurrency problems + boost::mutex::scoped_lock lock(table_servers_mutex); + + if(table_replication_servers.find(lheader->server_id) == table_replication_servers.end()) { + not_found = true; + } else { + not_found = false; + } + + if(not_found) { + // Consistency for this server not found, insert a record + ts = (tbr_server_t*) malloc(sizeof(tbr_server_t)); + ts->server_id = lheader->server_id; + ts->binlog_pos = lheader->next_position; + ts->gtid_known = gtid_known; + ts->gtid_len = gtid.get_gtid_length(); + ts->gtid = (unsigned char *)malloc(ts->gtid_len); + memcpy(ts->gtid, gtid.get_gtid(), ts->gtid_len); + + table_replication_servers.insert(pair(lheader->server_id, ts)); + } else { + // Consistency for this server found, update the consistency values + ts->binlog_pos = lheader->next_position; + free(ts->gtid); + ts->gtid_len = gtid.get_gtid_length(); + ts->gtid = (unsigned char *)malloc(ts->gtid_len); + memcpy(ts->gtid, gtid.get_gtid(), ts->gtid_len); + ts->gtid_known = gtid_known; + } + + if (tbr_trace) { + // This will log error to log file + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Trace: Current state for server %d binlog_pos %lu GTID '%s'", + ts->server_id, ts->binlog_pos, gtid.get_string().c_str()); + } +} + + /***********************************************************************//** This is the function that is executed by replication listeners. At startup it will try to connect the server and start listening @@ -204,7 +279,7 @@ void* tb_replication_listener_reader( try { Binary_log binlog(create_transport(uri), uri); - binlog.connect(gtid); + binlog.connect(); { // Need to be protected by mutex to avoid concurrency problems @@ -238,6 +313,9 @@ void* tb_replication_listener_reader( lheader = event->header(); + // Insert or update current server status + tbrl_update_server_status(lheader, gtid_known, gtid); + switch(event->get_event_type()) { case QUERY_EVENT: { @@ -680,7 +758,7 @@ void tbrl_extract_master_connect_info(); while(listener_shutdown == false) { - sleep(10000); // Sleep ~10 seconds + sleep(10); // Sleep ~10 seconds try { // Need to be protected by mutex to avoid concurrency problems @@ -702,11 +780,11 @@ void lock.unlock(); // Insert or update metadata information - err = tbrm_write_metadata( + err = tbrm_write_consistency_metadata( (const char *)master_host, (const char *)master_user, (const char *)master_passwd, - (unsigned int)master_port, + master_port, tm, nelems); @@ -750,6 +828,80 @@ my_exit: return NULL; } +/***********************************************************************//** +Read current state of the metadata from the MySQL server or create +necessary metadata and initialize listener metadata. +@return true on success, false on failure +*/ +bool +tb_replication_listener_init( +/*=========================*/ + replication_listener_t* rpl, /*! in: Master server definition */ + char **error_message) /*!< out: error message */ +{ + tbr_metadata_t *tm = NULL; + size_t tm_rows = 0; + std::string dbtable; + std::string err; + + master = rpl; + // Set up master connect info + tbrl_extract_master_connect_info(); + + + try { + if (!tbrm_read_consistency_metadata((const char *)master_host, + (const char *)master_user, + (const char *)master_passwd, + (unsigned int)master_port, + &tm, + &tm_rows)) { + err = std::string("Error: reading metadata failed"); + goto error_exit; + } + + for(size_t i=0;i < tm_rows; i++) { + tbr_metadata_t *t = &(tm[i]); + dbtable = std::string((char *)t->db_table); + + table_consistency_map.insert(pair(dbtable, t)); + } + } + catch(ListenerException e) + { + err = std::string("Listener exception: ")+ e.what(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + goto error_exit; + } + catch(boost::system::error_code e) + { + err = std::string("Listener system exception: ")+ e.message(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + goto error_exit; + } + // Try and catch all exceptions + catch(std::exception const& e) + { + err = std::string("Listener other exception: ")+ e.what(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + goto error_exit; + } + // Rest of them + catch(...) + { + err = std::string("Unknown exception: "); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + goto error_exit; + } + + return true; + +error_exit: + *error_message = (char *)malloc(err.length()+1); + strcpy(*error_message, err.c_str()); + return false; +} + /***********************************************************************//** Write current state of the metadata to the MySQL server and clean up the data structures. @@ -773,7 +925,13 @@ tb_replication_listener_done( } // Insert or update metadata information - err = tbrm_write_metadata(master_host, master_user, master_passwd, master_port, tm, (size_t)nelems); + err = tbrm_write_consistency_metadata( + (const char *)master_host, + (const char *)master_user, + (const char *)master_passwd, + (unsigned int)master_port, + tm, + (size_t)nelems); free(tm); diff --git a/table_replication_consistency/table_replication_listener.h b/table_replication_consistency/table_replication_listener.h index f3373dc62..c207a37d0 100644 --- a/table_replication_consistency/table_replication_listener.h +++ b/table_replication_consistency/table_replication_listener.h @@ -91,6 +91,17 @@ void /*======================================*/ void *arg); /*!< in: Master definition */ +/***********************************************************************//** +Read current state of the metadata from the MySQL server or create +necessary metadata and initialize listener metadata. +@return true on success, false on failure +*/ +bool +tb_replication_listener_init( +/*=========================*/ + replication_listener_t* rpl, /*! in: Master server definition */ + char **error_message); /*!< out: error message */ + /***********************************************************************//** Write current state of the metadata to the MySQL server and clean up the data structures. diff --git a/table_replication_consistency/table_replication_metadata.cpp b/table_replication_consistency/table_replication_metadata.cpp index 897364099..f024fbaf6 100644 --- a/table_replication_consistency/table_replication_metadata.cpp +++ b/table_replication_consistency/table_replication_metadata.cpp @@ -102,11 +102,15 @@ tbrm_create_metadata( unsigned int myerrno=0; if (!con) { - // TODO: start to log error and other messages + skygw_log_write_flush(NULL, LOGFILE_ERROR, + (char *)"Mysql init failed", mysql_error(con)); return false; } - if (mysql_real_connect(con, master_host, user, passwd, "mysql", mysql_port, NULL, 0) == NULL) { + mysql_options(con, MYSQL_READ_DEFAULT_GROUP, "libmysqld_client"); + mysql_options(con, MYSQL_OPT_USE_REMOTE_CONNECTION, NULL); + + if (!mysql_real_connect(con, master_host, user, passwd, NULL, master_port, NULL, 0)) { tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__); goto error_exit; } @@ -115,12 +119,12 @@ tbrm_create_metadata( mysql_query(con, "USE SKYSQL_GATEWAY_METADATA"); myerrno = mysql_errno(con); - if (myerrno != 0 && myerrno != ER_NO_DB_ERROR) { - tbrm_report_error(con, "Error: mysql_query(USE_SKYSQL_GATEWAY_METADATA) failed", __FILE__, __LINE__); - goto error_exit; - } else if (myerrno == 0) { + if (myerrno == 0) { // Database found, assuming everyting ok return true; + } else if (myerrno != ER_BAD_DB_ERROR) { + tbrm_report_error(con, "Error: mysql_query(USE_SKYSQL_GATEWAY_METADATA) failed", __FILE__, __LINE__); + goto error_exit; } // Create databse @@ -139,8 +143,8 @@ tbrm_create_metadata( goto error_exit; } - // Create table - mysql_query(con, "CREATE TABLE IF NOT EXISTS TABLE_REPLICATION_CONSISTENCY(" + // Create consistency table + mysql_query(con, "CREATE TABLE TABLE_REPLICATION_CONSISTENCY(" "DB_TABLE_NAME VARCHAR(255) NOT NULL," "SERVER_ID INT NOT NULL," "GTID VARBINARY(255)," @@ -161,6 +165,28 @@ tbrm_create_metadata( goto error_exit; } + // Create servers table + mysql_query(con, "CREATE TABLE TABLE_REPLICATION_SERVERS(" + "SERVER_ID INT NOT NULL," + "BINLOG_POS BIGINT NOT NULL," + "GTID VARBINARY(255)," + "GTID_KNOWN INT," + "SERVER_TYPE INT," + "PRIMARY KEY(SERVER_ID)) ENGINE=InnoDB"); + + if (mysql_errno(con) != 0) { + tbrm_report_error(con, "Error: Create table failed", __FILE__, __LINE__); + goto error_exit; + } + + // Above clauses not really transactional, but lets play safe + mysql_query(con, "COMMIT"); + + if (mysql_errno(con) != 0) { + tbrm_report_error(con, "Error: Commit failed", __FILE__, __LINE__); + goto error_exit; + } + mysql_close(con); // Done @@ -181,17 +207,16 @@ This function will create necessary database and table if they are not yet created. @return false if read failed, true if read succeeded */ bool -tbrm_read_metadata( -/*===============*/ +tbrm_read_consistency_metadata( +/*===========================*/ const char *master_host, /*!< in: Master hostname */ const char *user, /*!< in: username */ const char *passwd, /*!< in: password */ unsigned int master_port, /*!< in: master port */ tbr_metadata_t **tbrm_meta, /*!< out: table replication consistency metadata. */ - unsigned int *tbrm_rows) /*!< out: number of rows read */ + size_t *tbrm_rows) /*!< out: number of rows read */ { - MYSQL *con = mysql_init(NULL); unsigned int myerrno=0; boost::uint64_t nrows=0; boost::uint64_t i=0; @@ -199,12 +224,17 @@ tbrm_read_metadata( tbrm_create_metadata(master_host, user, passwd, master_port); + MYSQL *con = mysql_init(NULL); + if (!con) { // TODO: start to log error and other messages return false; } - if (mysql_real_connect(con, master_host, user, passwd, "mysql", mysql_port, NULL, 0) == NULL) { + mysql_options(con, MYSQL_READ_DEFAULT_GROUP, "libmysqld_client"); + mysql_options(con, MYSQL_OPT_USE_REMOTE_CONNECTION, NULL); + + if (!mysql_real_connect(con, master_host, user, passwd, NULL, master_port, NULL, 0)) { tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__); goto error_exit; } @@ -273,14 +303,13 @@ tbrm_read_metadata( return false; } - /***********************************************************************//** Write table replication consistency metadata from the MySQL master server. This function assumes that necessary database and table are created. @return false if read failed, true if read succeeded */ bool -tbrm_write_metadata( -/*================*/ +tbrm_write_consistency_metadata( +/*============================*/ const char *master_host, /*!< in: Master hostname */ const char *user, /*!< in: username */ const char *passwd, /*!< in: password */ @@ -289,7 +318,6 @@ tbrm_write_metadata( metadata. */ size_t tbrm_rows) /*!< in: number of rows read */ { - MYSQL *con = mysql_init(NULL); int myerrno=0; boost::uint32_t i; MYSQL_STMT *sstmt=NULL; @@ -319,13 +347,19 @@ tbrm_write_metadata( "SET GTID=?, BINLOG_POS=?, GTID_KNOWN=?" " WHERE DB_TABLE_NAME=? AND SERVER_ID=?"; + MYSQL *con = mysql_init(NULL); + if (!con) { // TODO: start to log error and other messages return false; } - if (mysql_real_connect(con, master_host, user, passwd, "mysql", mysql_port, NULL, 0) == NULL) { + mysql_options(con, MYSQL_READ_DEFAULT_GROUP, "libmysqld_client"); + mysql_options(con, MYSQL_OPT_USE_REMOTE_CONNECTION, NULL); + + if (!mysql_real_connect(con, master_host, user, passwd, NULL, master_port, NULL, 0)) { tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__); + goto error_exit; } mysql_query(con, "USE SKYSQL_GATEWAY_METADATA"); @@ -407,7 +441,6 @@ tbrm_write_metadata( dbtable = (char *)tbrm_meta[i]->db_table; gtid = (char *)tbrm_meta[i]->gtid; gtidknown = tbrm_meta[i]->gtid_known; - binlogpos = tbrm_meta[i]->binlog_pos; serverid = tbrm_meta[i]->server_id; sparam[0].buffer_length = strlen(dbtable); @@ -523,6 +556,356 @@ tbrm_write_metadata( } +/***********************************************************************//** +Read table replication server metadata from the MySQL master server. +This function will create necessary database and table if they are not +yet created. +@return false if read failed, true if read succeeded */ +bool +tbrm_read_server_metadata( +/*======================*/ + const char *master_host, /*!< in: Master hostname */ + const char *user, /*!< in: username */ + const char *passwd, /*!< in: password */ + unsigned int master_port, /*!< in: master port */ + tbr_server_t **tbrm_servers,/*!< out: table replication server + metadata. */ + size_t *tbrm_rows) /*!< out: number of rows read */ +{ + unsigned int myerrno=0; + boost::uint64_t nrows=0; + boost::uint64_t i=0; + MYSQL_RES *result = NULL; + + tbrm_create_metadata(master_host, user, passwd, master_port); + + MYSQL *con = mysql_init(NULL); + + if (!con) { + // TODO: start to log error and other messages + return false; + } + + mysql_options(con, MYSQL_READ_DEFAULT_GROUP, "libmysqld_client"); + mysql_options(con, MYSQL_OPT_USE_REMOTE_CONNECTION, NULL); + + if (!mysql_real_connect(con, master_host, user, passwd, NULL, master_port, NULL, 0)) { + tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__); + goto error_exit; + } + + mysql_query(con, "USE SKYSQL_GATEWAY_METADATA"); + myerrno = mysql_errno(con); + + if (myerrno != 0) { + tbrm_report_error(con, "Error: Database set failed", __FILE__, __LINE__); + goto error_exit; + } + + mysql_query(con, "SELECT * FROM TABLE_REPLICATION_SERVERS"); + myerrno = mysql_errno(con); + + if (myerrno != 0) { + tbrm_report_error(con,"Error: Select from table_replication_consistency failed", __FILE__, __LINE__); + goto error_exit; + } + + result = mysql_store_result(con); + + if (!result) { + tbrm_report_error(con, "Error: mysql_store_result failed", __FILE__, __LINE__); + goto error_exit; + } + + nrows = mysql_num_rows(result); + + *tbrm_servers = (tbr_server_t*) calloc(nrows, sizeof(tbr_server_t)); + *tbrm_rows = nrows; + + for(i=0;i < nrows; i++) { + MYSQL_ROW row = mysql_fetch_row(result); + unsigned long *lengths = mysql_fetch_lengths(result); + // SERVER_ID + tbrm_servers[i]->server_id = atol(row[0]); + // BINLOG_POS + tbrm_servers[i]->binlog_pos = atoll(row[1]); + // GTID + tbrm_servers[i]->gtid = (unsigned char *)malloc((lengths[2])*sizeof(unsigned char)); + memcpy(tbrm_servers[i]->gtid, row[2], lengths[2]); + tbrm_servers[i]->gtid_len = lengths[2]; + // GTID_KNOWN + tbrm_servers[i]->gtid_known = atol(row[3]); + // SERVER_TYPE + tbrm_servers[i]->server_type = atol(row[4]); + } + + mysql_free_result(result); + mysql_close(con); + + return true; + + error_exit: + + if (result) { + mysql_free_result(result); + } + + if (con) { + mysql_close(con); + } + + return false; +} + +/***********************************************************************//** +Write table replication server metadata from the MySQL master server. +This function assumes that necessary database and table are created. +@return false if read failed, true if read succeeded */ +bool +tbrm_write_server_metadata( +/*=======================*/ + const char *master_host, /*!< in: Master hostname */ + const char *user, /*!< in: username */ + const char *passwd, /*!< in: password */ + unsigned int master_port, /*!< in: master port */ + tbr_server_t **tbrm_servers,/*!< in: table replication server + metadata. */ + size_t tbrm_rows) /*!< in: number of rows read */ +{ + int myerrno=0; + boost::uint32_t i; + MYSQL_STMT *sstmt=NULL; + MYSQL_STMT *istmt=NULL; + MYSQL_STMT *ustmt=NULL; + MYSQL_BIND sparam[1]; + MYSQL_BIND iparam[5]; + MYSQL_BIND uparam[4]; + MYSQL_BIND result[1]; + char *dbtable; + void *gtid; + int gtidknown; + unsigned int serverid; + int servertype; + boost::uint64_t binlogpos; + + // Query to find out if the row already exists on table + const char *sst = "SELECT BINLOG_POS FROM TABLE_REPLICATION_CONSISTENCY WHERE" + " SERVER_ID=?"; + + // Insert Query + const char *ist = "INSERT INTO TABLE_REPLICATION_SERVERS(" + " SERVER_ID, GTID, BINLOG_POS, GTID_KNOWN, SERVER_TYPE) VALUES" + "(?, ?, ?, ?, ?)"; + + // Update Query + const char *ust = "UPDATE TABLE_REPLICATION_SERVERS " + "SET GTID=?, BINLOG_POS=?, GTID_KNOWN=?" + " WHERE SERVER_ID=?"; + + MYSQL *con = mysql_init(NULL); + + if (!con) { + // TODO: start to log error and other messages + return false; + } + + mysql_options(con, MYSQL_READ_DEFAULT_GROUP, "libmysqld_client"); + mysql_options(con, MYSQL_OPT_USE_REMOTE_CONNECTION, NULL); + + if (!mysql_real_connect(con, master_host, user, passwd, NULL, master_port, NULL, 0)) { + tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__); + goto error_exit; + } + + mysql_query(con, "USE SKYSQL_GATEWAY_METADATA"); + myerrno = mysql_errno(con); + + if (myerrno != 0) { + tbrm_report_error(con, "Error: Database set failed", __FILE__, __LINE__); + } + + // Allocate statement handlers + sstmt = mysql_stmt_init(con); + istmt = mysql_stmt_init(con); + ustmt = mysql_stmt_init(con); + + if (sstmt == NULL || istmt == NULL || ustmt == NULL) { + tbrm_report_error(con, "Could not initialize statement handler", __FILE__, __LINE__); + goto error_exit; + } + + // Prepare the statements + if (mysql_stmt_prepare(sstmt, sst, strlen(sst)) != 0) { + tbrm_stmt_error(sstmt, "Error: Could not prepare select statement", __FILE__, __LINE__); + goto error_exit; + } + if (mysql_stmt_prepare(istmt, ist, strlen(ist)) != 0) { + tbrm_stmt_error(istmt, "Error: Could not prepare insert statement", __FILE__, __LINE__); + goto error_exit; + } + if (mysql_stmt_prepare(ustmt, ust, strlen(ust)) != 0) { + tbrm_stmt_error(ustmt, "Error: Could not prepare update statement", __FILE__, __LINE__); + goto error_exit; + } + + // Initialize the parameters + memset (sparam, 0, sizeof (sparam)); + memset (iparam, 0, sizeof (iparam)); + memset (uparam, 0, sizeof (uparam)); + memset (result, 0, sizeof (result)); + + // Init param structure + // Select + sparam[0].buffer_type = MYSQL_TYPE_LONG; + sparam[0].buffer = (void *) &serverid; + // Insert + iparam[0].buffer_type = MYSQL_TYPE_LONG; + iparam[0].buffer = (void *) &serverid; + iparam[1].buffer_type = MYSQL_TYPE_BLOB; + iparam[1].buffer = (void *) gtid; + iparam[2].buffer_type = MYSQL_TYPE_LONGLONG; + iparam[2].buffer = (void *) &binlogpos; + iparam[3].buffer_type = MYSQL_TYPE_SHORT; + iparam[3].buffer = (void *) >idknown; + iparam[4].buffer_type = MYSQL_TYPE_LONG; + iparam[4].buffer = (void *) &servertype; + // Update + uparam[0].buffer_type = MYSQL_TYPE_BLOB; + uparam[0].buffer = (void *) gtid; + uparam[1].buffer_type = MYSQL_TYPE_LONGLONG; + uparam[1].buffer = (void *) &binlogpos; + uparam[2].buffer_type = MYSQL_TYPE_SHORT; + uparam[2].buffer = (void *) >idknown; + uparam[3].buffer_type = MYSQL_TYPE_LONG; + uparam[3].buffer = (void *) &serverid; + // Result set for select + result[0].buffer_type = MYSQL_TYPE_LONGLONG; + result[0].buffer = &binlogpos; + + + // Iterate through the data + for(i = 0; i < tbrm_rows; i++) { + // Start from Select, we need to know if the consistency + // information for this table, server pair is already + // in metadata or not. + + gtid = (char *)tbrm_servers[i]->gtid; + gtidknown = tbrm_servers[i]->gtid_known; + serverid = tbrm_servers[i]->server_id; + servertype = tbrm_servers[i]->server_type; + + uparam[0].buffer_length = tbrm_servers[i]->gtid_len; + iparam[1].buffer_length = tbrm_servers[i]->gtid_len; + + // Bind param structure to statement + if (mysql_stmt_bind_param(sstmt, sparam) != 0) { + tbrm_stmt_error(sstmt, "Error: Could not bind select parameters", __FILE__, __LINE__); + goto error_exit; + } + + // Bind result structure to statement + if (mysql_stmt_bind_result(sstmt, result) != 0) { + tbrm_stmt_error(sstmt, "Error: Could not bind select return parameters", __FILE__, __LINE__); + goto error_exit; + } + + // Execute!! + if (mysql_stmt_execute(sstmt) != 0) { + tbrm_stmt_error(sstmt, "Error: Could not execute select statement", __FILE__, __LINE__); + goto error_exit; + } + + // Store result + if (mysql_stmt_store_result(sstmt) != 0) { + tbrm_stmt_error(sstmt, "Error: Could not buffer result set", __FILE__, __LINE__); + goto error_exit; + } + + // Fetch result + myerrno = mysql_stmt_fetch(sstmt); + if (myerrno == 1) { + tbrm_stmt_error(sstmt, "Error: Could not fetch result set", __FILE__, __LINE__); + goto error_exit; + } + + // If fetch returned 0, it means that this table, serverid + // pair was found from metadata, we might need to update + // the consistency information. + if (myerrno == 0 && binlogpos != tbrm_servers[i]->binlog_pos) { + // Update the consistency information + binlogpos = tbrm_servers[i]->binlog_pos; + + // Bind param structure to statement + if (mysql_stmt_bind_param(ustmt, uparam) != 0) { + tbrm_stmt_error(ustmt, "Error: Could not bind update parameters", __FILE__, __LINE__); + goto error_exit; + } + // Execute!! + if (mysql_stmt_execute(ustmt) != 0) { + tbrm_stmt_error(ustmt, "Error: Could not execute update statement", __FILE__, __LINE__); + goto error_exit; + } + if (tbr_debug) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Debug: Metadata state updated for %s in server %d is binlog_pos %lu gtid '%s'", + dbtable, serverid, binlogpos, gtid); + } + + } else { + // Insert the consistency information + binlogpos = tbrm_servers[i]->binlog_pos; + // Bind param structure to statement + if (mysql_stmt_bind_param(istmt, iparam) != 0) { + tbrm_stmt_error(istmt, "Error: Could not bind insert parameters", __FILE__, __LINE__); + goto error_exit; + } + + // Execute!! + if (mysql_stmt_execute(istmt) != 0) { + tbrm_stmt_error(istmt, "Error: Could not execute insert statement", __FILE__, __LINE__); + goto error_exit; + } + + if (tbr_debug) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Debug: Metadata state inserted for %s in server %d is binlog_pos %lu gtid '%s'", + dbtable, serverid, binlogpos, gtid); + } + } + } + + return true; + + error_exit: + // Cleanup + if (sstmt) { + if (mysql_stmt_close(sstmt)) { + tbrm_stmt_error(sstmt, "Error: Could not close select statement", __FILE__, __LINE__); + } + } + + if (istmt) { + if (mysql_stmt_close(istmt)) { + tbrm_stmt_error(istmt, "Error: Could not close select statement", __FILE__, __LINE__); + } + } + + if (ustmt) { + if (mysql_stmt_close(ustmt)) { + tbrm_stmt_error(ustmt, "Error: Could not close select statement", __FILE__, __LINE__); + } + } + + if (con) { + mysql_close(con); + } + + return false; + +} + + + } // table_replication_metadata } // mysql diff --git a/table_replication_consistency/table_replication_metadata.h b/table_replication_consistency/table_replication_metadata.h index 6ec016ed3..23ac8499e 100644 --- a/table_replication_consistency/table_replication_metadata.h +++ b/table_replication_consistency/table_replication_metadata.h @@ -29,7 +29,7 @@ namespace mysql { namespace table_replication_metadata { -/* Structure definition for table replication oconsistency metadata */ +/* Structure definition for table replication consistency metadata */ typedef struct { unsigned char* db_table; /* Fully qualified db.table name, primary key. */ @@ -40,13 +40,30 @@ typedef struct { bool gtid_known; /* Is gtid known ? */ } tbr_metadata_t; +/* Structure definition for table replication server metadata */ +typedef struct { + boost::uint32_t server_id; /* Server id, primary key*/ + boost::uint64_t binlog_pos; /* Last executed binlog position */ + unsigned char* gtid; /* Last executed global transaction + id if known */ + boost::uint32_t gtid_len; /* Actual length of gtid */ + bool gtid_known; /* 1 if gtid known, 0 if not */ + boost::uint32_t server_type; /* server type */ + } tbr_server_t; + +// Not really nice, but currently we support only these two +// server types. +#define TRC_SERVER_TYPE_MARIADB = 1, +#define TRC_SERVER_TYPE_MYSQL = 2 + + /***********************************************************************//** Read table replication consistency metadata from the MySQL master server. This function assumes that necessary database and table are created. @return false if read failed, true if read succeeded */ bool -tbrm_read_metadata( -/*===============*/ +tbrm_read_consistency_metadata( +/*===========================*/ const char *master_host, /*!< in: Master hostname */ const char *user, /*!< in: username */ const char *passwd, /*!< in: password */ @@ -55,13 +72,28 @@ tbrm_read_metadata( metadata. */ size_t *tbrm_rows); /*!< out: number of rows read */ +/***********************************************************************//** +Read table replication server metadata from the MySQL master server. +This function assumes that necessary database and table are created. +@return false if read failed, true if read succeeded */ +bool +tbrm_read_server_metadata( +/*======================*/ + const char *master_host, /*!< in: Master hostname */ + const char *user, /*!< in: username */ + const char *passwd, /*!< in: password */ + unsigned int master_port, /*!< in: master port */ + tbr_server_t **tbrm_server, /*!< out: table replication server + metadata. */ + size_t *tbrm_rows); /*!< out: number of rows read */ + /***********************************************************************//** Write table replication consistency metadata from the MySQL master server. This function assumes that necessary database and table are created. @return false if read failed, true if read succeeded */ bool -tbrm_write_metadata( -/*================*/ +tbrm_write_consistency_metadata( +/*============================*/ const char *master_host, /*!< in: Master hostname */ const char *user, /*!< in: username */ const char *passwd, /*!< in: password */ @@ -70,6 +102,22 @@ tbrm_write_metadata( metadata. */ size_t tbrm_rows); /*!< in: number of rows read */ +/***********************************************************************//** +Write table replication server metadata from the MySQL master server. +This function assumes that necessary database and table are created. +@return false if read failed, true if read succeeded */ +bool +tbrm_write_server_metadata( +/*=======================*/ + const char *master_host, /*!< in: Master hostname */ + const char *user, /*!< in: username */ + const char *passwd, /*!< in: password */ + unsigned int master_port, /*!< in: master port */ + tbr_server_t **tbrm_server, /*!< out: table replication server + metadata. */ + size_t *tbrm_rows); /*!< out: number of rows read */ + + } // table_replication_metadata } // mysql diff --git a/table_replication_consistency/test/CMakeLists.txt b/table_replication_consistency/test/CMakeLists.txt index d88b15a5d..8cf572378 100644 --- a/table_replication_consistency/test/CMakeLists.txt +++ b/table_replication_consistency/test/CMakeLists.txt @@ -7,8 +7,6 @@ LINK_DIRECTORIES(${Boost_LIBRARY_DIRS}) INCLUDE_DIRECTORIES(${Boost_INCLUDE_DIR}) # Find MySQL client library and header files -find_library(MySQL_LIBRARY NAMES libmysqld.a PATHS -/usr/lib64/mysql /usr/lib/mysql /usr/local/mysql/lib) find_path(MySQL_INCLUDE_DIR mysql.h /usr/local/include/mysql /usr/include/mysql) include_directories(${MySQL_INCLUDE_DIR}) @@ -24,11 +22,11 @@ include_directories(${TRC_INCLUDE_DIR}) # Build rule for example foreach(prog Example) - ADD_EXECUTABLE(${prog} ${prog}.c ../../utils/skygw_utils.o ) - TARGET_LINK_LIBRARIES(${prog} table_replication_consistency.a replication boost_system pthread stdc++ ${MySQL_LIBRARY} crypt aio log_manager) + ADD_EXECUTABLE(${prog} ${prog}.c ../../utils/skygw_utils.o /usr/local/mysql/lib/libmysqld.a) + TARGET_LINK_LIBRARIES(${prog} table_replication_consistency.a replication boost_system boost_thread pthread stdc++ crypt aio log_manager) endforeach() foreach(prog test) - ADD_EXECUTABLE(${prog} ${prog}.cpp ../../utils/skygw_utils.o ) - TARGET_LINK_LIBRARIES(${prog} table_replication_consistency.a replication boost_system pthread stdc++ ${MySQL_LIBRARY} crypt aio log_manager) + ADD_EXECUTABLE(${prog} ${prog}.cpp ../../utils/skygw_utils.o /usr/local/mysql/lib/libmysqld.a) + TARGET_LINK_LIBRARIES(${prog} table_replication_consistency.a replication boost_system boost_thread pthread stdc++ crypt aio log_manager) endforeach() diff --git a/table_replication_consistency/test/Example b/table_replication_consistency/test/Example index ad96ea580..3dfeb89b7 100755 Binary files a/table_replication_consistency/test/Example and b/table_replication_consistency/test/Example differ