diff --git a/build_gateway.inc b/build_gateway.inc index 50289b58a..32d46e62b 100644 --- a/build_gateway.inc +++ b/build_gateway.inc @@ -23,7 +23,7 @@ UNIX := # makefile.inc and build_gateway.inc are located. # ROOT_PATH is used in makefile. # -ROOT_PATH := +ROOT_PATH := /home/jan/skysql/skygateway/skygateway # MARIADB_SRC_PATH may be defined either as an environment variable or # specifically here diff --git a/replication_listener/include/gtid.h b/replication_listener/include/gtid.h index fbd0403ba..cd9c48a8d 100644 --- a/replication_listener/include/gtid.h +++ b/replication_listener/include/gtid.h @@ -52,7 +52,7 @@ class Gtid Gtid() : m_real_gtid(false), m_domain_id(0), m_server_id(0), m_sequence_number(0), - m_server_type(MYSQL_SERVER_TYPE_NA), m_gtid_length(0) + m_server_type(MYSQL_SERVER_TYPE_NA), m_gtid_length(0), m_mariadb_gtid(std::string("")) { memset(m_mysql_gtid, 0, MYSQL_GTID_ENCODED_SIZE); } diff --git a/table_replication_consistency/CMakeLists.txt b/table_replication_consistency/CMakeLists.txt index 5183c9e5c..3d74ec933 100644 --- a/table_replication_consistency/CMakeLists.txt +++ b/table_replication_consistency/CMakeLists.txt @@ -4,11 +4,7 @@ cmake_minimum_required (VERSION 2.6) # This configuration file builds both the static and shared version of # the library. -set(table_replication_consistency_sources - table_replication_consistency.cpp - table_replication_listener.cpp - table_replication_parser.cpp - table_replication_metadata.cpp) +set(table_replication_consistency_sources table_replication_consistency.cpp table_replication_listener.cpp table_replication_parser.cpp table_replication_metadata.cpp) # ---------- Find Boost Headers/Libraries ----------------------- SET(Boost_DEBUG FALSE) @@ -24,7 +20,7 @@ LINK_DIRECTORIES(${Boost_LIBRARY_DIRS}) INCLUDE_DIRECTORIES(${Boost_INCLUDE_DIR}) # Find MySQL client library and header files -find_library(MySQL_LIBRARY NAMES mysqld.a PATHS +find_library(MySQL_LIBRARY NAMES libmysqld.a PATHS /usr/lib64/mysql /usr/lib/mysql /usr/local/mysql/lib) find_path(MySQL_INCLUDE_DIR mysql.h /usr/local/include/mysql /usr/include/mysql) @@ -35,15 +31,25 @@ find_path(SkySQL_INCLUDE_DIR skygw_debug.h /usr/local/include /usr/include ../utils) include_directories(${SkySQL_INCLUDE_DIR}) +#log_manager +FIND_LIBRARY(LIB_LOGMANAGER log_manager /lib /opt/local/lib /opt/lib /usr/lib /usr/local/lib) +find_path(LogManager_INCLUDE_DIR log_manager.h + /usr/local/include /usr/include ../log_manager) +include_directories(${LogManager_INCLUDE_DIR}) + +#debug settings +SET(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DTBR_DEBUG") +SET(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -DTBR_DEBUG") + # Configure for building static library add_library(table_replication_consistency_static STATIC ${table_replication_consistency_sources}) -target_link_libraries(table_replication_consistency_static crypto ${Boost_LIBRARIES} ${MySQL_LIBRARY}) +target_link_libraries(table_replication_consistency_static crypto log_manager ${Boost_LIBRARIES} ${MySQL_LIBRARY}) set_target_properties(table_replication_consistency_static PROPERTIES OUTPUT_NAME "table_replication_consistency") # Configure for building shared library add_library(table_replication_consistency_shared SHARED ${table_replication_consistency_sources}) -target_link_libraries(table_replication_consistency_shared crypto ${Boost_LIBRARIES} ${MySQL_LIBRARY}) +target_link_libraries(table_replication_consistency_shared crypto log_manager ${Boost_LIBRARIES} ${MySQL_LIBRARY}) set_target_properties(table_replication_consistency_shared PROPERTIES VERSION 0.1 SOVERSION 1 diff --git a/table_replication_consistency/cmake_install.cmake b/table_replication_consistency/cmake_install.cmake index 19eb2623d..41b8b0f11 100644 --- a/table_replication_consistency/cmake_install.cmake +++ b/table_replication_consistency/cmake_install.cmake @@ -33,35 +33,11 @@ IF(NOT DEFINED CMAKE_INSTALL_SO_NO_EXE) ENDIF(NOT DEFINED CMAKE_INSTALL_SO_NO_EXE) IF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified") - FOREACH(file - "$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so.0.1" - "$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so.1" - "$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so" - ) - IF(EXISTS "${file}" AND - NOT IS_SYMLINK "${file}") - FILE(RPATH_CHECK - FILE "${file}" - RPATH "") - ENDIF() - ENDFOREACH() FILE(INSTALL DESTINATION "${CMAKE_INSTALL_PREFIX}/lib" TYPE SHARED_LIBRARY FILES - "/home/jan/skysql/skygateway/skygateway/table_replication_consistency/libtable_replication_consistency.so.0.1" - "/home/jan/skysql/skygateway/skygateway/table_replication_consistency/libtable_replication_consistency.so.1" - "/home/jan/skysql/skygateway/skygateway/table_replication_consistency/libtable_replication_consistency.so" + "/home/jan/skysql/skygateway/skygateway/table_replication_consistency/CMakeFiles/CMakeRelink.dir/libtable_replication_consistency.so.0.1" + "/home/jan/skysql/skygateway/skygateway/table_replication_consistency/CMakeFiles/CMakeRelink.dir/libtable_replication_consistency.so.1" + "/home/jan/skysql/skygateway/skygateway/table_replication_consistency/CMakeFiles/CMakeRelink.dir/libtable_replication_consistency.so" ) - FOREACH(file - "$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so.0.1" - "$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so.1" - "$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so" - ) - IF(EXISTS "${file}" AND - NOT IS_SYMLINK "${file}") - IF(CMAKE_INSTALL_DO_STRIP) - EXECUTE_PROCESS(COMMAND "/usr/bin/strip" "${file}") - ENDIF(CMAKE_INSTALL_DO_STRIP) - ENDIF() - ENDFOREACH() ENDIF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified") IF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified") diff --git a/table_replication_consistency/table_replication_consistency.cpp b/table_replication_consistency/table_replication_consistency.cpp index 2b2dae92e..90ad45429 100644 --- a/table_replication_consistency/table_replication_consistency.cpp +++ b/table_replication_consistency/table_replication_consistency.cpp @@ -33,19 +33,39 @@ Updated: #include #include "table_replication_consistency.h" #include "table_replication_listener.h" +#include "table_replication_metadata.h" #include "listener_exception.h" - +#include "log_manager.h" /* Global memory */ pthread_t *replication_listener_tid = NULL; +pthread_t *replication_listener_metadata_tid=NULL; unsigned int n_replication_listeners = 0; +bool listener_shutdown=false; /* This flag will be true + at shutdown */ + + +#ifdef TBR_DEBUG +bool tbr_trace = true; +bool tbr_debug = true; +#else +#ifdef TBR_TRACE +bool tbr_trace = true; +#else +bool tbr_trace = false; +#endif +bool tbr_trace = false; +bool tbr_debug = false; +#endif + /* Namespaces */ using namespace std; using namespace mysql; using namespace table_replication_listener; +using namespace table_replication_metadata; /***********************************************************************//** This function will register replication listener for every server @@ -58,25 +78,63 @@ tb_replication_consistency_init( replication_listener_t *rpl, /*!< in: Server definition. */ size_t n_servers, /*!< in: Number of servers */ - unsigned int gateway_server_id) /*!< in: Gateway slave + unsigned int gateway_server_id, /*!< in: Gateway slave server id. */ + int trace_level) /*!< in: Trace level */ { boost::uint32_t i; int err = 0; string errmsg=""; + // Allocate memory for thread identifiers replication_listener_tid = (pthread_t*)malloc(sizeof(pthread_t) * (n_servers + 1)); if (replication_listener_tid == NULL) { - errmsg = string("Table_Replication_Consistency: out of memory"); + errmsg = string("Fatal: Table_Replication_Consistency: out of memory"); goto error_handling; } + replication_listener_metadata_tid = (pthread_t*)malloc(sizeof(pthread_t)); + + if (replication_listener_metadata_tid == NULL) { + free(replication_listener_tid); + errmsg = string("Fatal: Table_Replication_Consistency: out of memory"); + goto error_handling; + } + + // Set up trace level + if (trace_level & TBR_TRACE_DEBUG) { + tbr_debug = true; + } + + if (trace_level & TBR_TRACE_TRACE) { + tbr_trace = true; + } + + // Start replication stream reader thread for every server in the configuration for(i=0;i < n_servers; i++) { + // We need to try catch all exceptions here because function + // calling this service could be implemented using C-language + // thus we need to protect it. try { rpl[i].gateway_slave_server_id = gateway_server_id; rpl[i].listener_id = i; + // For master we start also metadata updater + if (rpl[i].is_master) { + err = pthread_create( + replication_listener_metadata_tid, + NULL, + &(tb_replication_listener_metadata_updater), + (void *)&(rpl[i])); + + if (err) { + errmsg = string(strerror(err)); + goto error_handling; + } + } + + // Start actual replication listener err = pthread_create( &replication_listener_tid[i], NULL, @@ -87,12 +145,16 @@ tb_replication_consistency_init( errmsg = string(strerror(err)); goto error_handling; } + } + // Replication listener will use this exception for + // error handling. catch(ListenerException const& e) { errmsg = e.what(); goto error_handling; } + // Boost library exceptions catch(boost::system::error_code const& e) { errmsg = e.message(); @@ -112,6 +174,7 @@ tb_replication_consistency_init( } } + // Number of threads n_replication_listeners = i; /* We will try to join the threads at shutdown */ return (0); @@ -120,6 +183,10 @@ tb_replication_consistency_init( n_replication_listeners = i; rpl[i].error_message = (char *)malloc(errmsg.size()+1); strcpy(rpl[i].error_message, errmsg.c_str()); + + // This will log error to log file + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)errmsg.c_str()); + return (1); } @@ -150,7 +217,7 @@ tb_replication_consistency_query( // We need to protect C client from exceptions here try { for(i = 0; i < *n_servers; i++) { - err = tb_replication_listener_consistency(tb_query->db_dot_table, &tb_consistency[i], i); + err = tb_replication_listener_consistency((const unsigned char *)tb_query->db_dot_table, &tb_consistency[i], i); if (err) { goto err_exit; @@ -186,6 +253,8 @@ tb_replication_consistency_query( error_handling: tb_consistency[i].error_message = (char *)malloc(errmsg.size()+1); strcpy(tb_consistency[i].error_message, errmsg.c_str()); + // This will log error to log file + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)errmsg.c_str()); err_exit: *n_servers=i-1; @@ -246,6 +315,8 @@ tb_replication_consistency_reconnect( error_handling: rpl->error_message = (char *)malloc(errmsg.size()+1); strcpy(rpl->error_message, errmsg.c_str()); + // This will log error to log file + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)errmsg.c_str()); err_exit: return (1); @@ -267,16 +338,35 @@ tb_replication_consistency_shutdown( // We need to protect C client from exceptions here try { + + // Wait until all replication listeners are shut down for(i = 0; i < n_replication_listeners; i++) { + err = tb_replication_listener_shutdown(i, error_message); if (err) { goto err_exit; } + + // Need to wait until the thread exits + err = pthread_join(replication_listener_tid[i], (void **)error_message); + + if (err) { + goto err_exit; + } } - // Need to wait until the thread exits - err = pthread_join(replication_listener_tid[i], (void **)error_message); + listener_shutdown = true; + + // Wait until metadata writer has shut down + err = pthread_join(*replication_listener_metadata_tid, NULL); + + if (err) { + goto err_exit; + } + + // Write metadata to MySQL storage and clean up + err = tb_replication_listener_done(error_message); if (err) { goto err_exit; @@ -310,6 +400,8 @@ tb_replication_consistency_shutdown( error_handling: *error_message = (char *)malloc(errmsg.size()+1); strcpy(*error_message, errmsg.c_str()); + // This will log error to log file + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)errmsg.c_str()); err_exit: return (1); diff --git a/table_replication_consistency/table_replication_consistency.h b/table_replication_consistency/table_replication_consistency.h index 28c7e3fed..2ec92b9aa 100644 --- a/table_replication_consistency/table_replication_consistency.h +++ b/table_replication_consistency/table_replication_consistency.h @@ -28,6 +28,9 @@ Updated: #include +/* Global trace variables */ +extern bool tbr_trace; +extern bool tbr_debug; /* Structure definition for replication listener */ typedef struct { @@ -87,6 +90,16 @@ typedef struct table_consistency { server failed. */ } table_consistency_t; +/* Definitions for trace level */ +#define TBR_TRACE_TRACE (1UL << 1) /* Trace only important events and + periodical consistency information */ + +/* Full trace of selected events and consistency information */ +#define TBR_TRACE_DEBUG ((1UL << 2) | TBR_TRACE_TRACE) + +extern bool listener_shutdown; /* This flag will be true + at shutdown */ + EXTERN_C_BLOCK_BEGIN @@ -103,8 +116,9 @@ tb_replication_consistency_init( replication_listener_t *rpl, /*!< in: Server definition. */ size_t n_servers, /*!< in: Number of servers */ - unsigned int gateway_server_id);/*!< in: Gateway slave + unsigned int gateway_server_id, /*!< in: Gateway slave server id. */ + int trace_level); /*!< in: trace level */ /***********************************************************************//** With this fuction client can request table consistency status for a diff --git a/table_replication_consistency/table_replication_listener.cpp b/table_replication_consistency/table_replication_listener.cpp index 9e10a4782..43f0ea88f 100644 --- a/table_replication_consistency/table_replication_listener.cpp +++ b/table_replication_consistency/table_replication_listener.cpp @@ -38,6 +38,7 @@ Updated: #include "table_replication_listener.h" #include "table_replication_parser.h" #include "table_replication_metadata.h" +#include "log_manager.h" using mysql::Binary_log; using mysql::system::create_transport; @@ -45,6 +46,7 @@ using namespace std; using namespace mysql; using namespace system; using namespace table_replication_parser; +using namespace table_replication_metadata; namespace mysql { @@ -66,10 +68,53 @@ boost::mutex table_replication_mutex; /* This mutex is used protect abve data structure from multiple threads */ -bool listener_shutdown = false; /* This flag will be true - at shutdown */ -pthread_t metadata_updater_pid; /* Thread id for metadata - updater*/ +replication_listener_t *master; /* Master server definition */ + +/* Master connect info */ +const char *master_user=NULL; +const char *master_passwd=NULL; +const char *master_host=NULL; +unsigned long master_port=3306; + + +/***********************************************************************//** +Internal function to extract user, passwd, hostname and port from +replication listener url. */ +static void +tbrl_extract_master_connect_info() +/*==============================*/ +{ + char *body = master->server_url; + size_t len = strlen(master->server_url); + + /* Find the beginning of the user name */ + strncmp(body, "//", 2); + + /* Find the user name, which is mandatory */ + master_user = body + 2; + const char *user_end= strpbrk(master_user, ":@"); + + /* Find the password, which can be empty */ + assert(*user_end == ':' || *user_end == '@'); + master_passwd = user_end + 1; // Skip the ':' (or '@') + const char *pass_end = master_passwd; + if (*user_end == ':') + { + pass_end = strchr(master_passwd, '@'); + } + + /* Find the host name, which is mandatory */ + // Skip the '@', if there is one + master_host = *pass_end == '@' ? pass_end + 1 : pass_end; + const char *host_end = strchr(master_host, ':'); + /* If no ':' was found there is no port, so the host end at the end + * of the string */ + if (host_end == 0) + host_end = body + len; + /* Find the port number */ + if (*host_end == ':') + master_port = strtoul(host_end + 1, NULL, 10); +} /***********************************************************************//** Internal function to update table consistency information based @@ -104,17 +149,17 @@ tbrl_update_consistency( if(not_found) { // Consistency for this table and server not found, insert a record - tbr_metadata_t* tb_c = (tbr_metadata_t*) malloc(sizeof(tbr_metadata_t)); - tb_c->db_table = (unsigned char *)malloc(database_dot_table.size()+1); - strcpy((char *)tb_c->db_table, (char *)database_dot_table.c_str()); - tb_c->server_id = lheader->server_id; - tb_c->binlog_pos = lheader->next_position; - tb_c->gtid_known = gtid_known; - tb_c->gtid_len = gtid.get_gtid_length(); - tb_c->gtid = (unsigned char *)malloc(tb_c->gtid_len); - memcpy(tb_c->gtid, gtid.get_gtid(), tb_c->gtid_len); + tc = (tbr_metadata_t*) malloc(sizeof(tbr_metadata_t)); + tc->db_table = (unsigned char *)malloc(database_dot_table.size()+1); + strcpy((char *)tc->db_table, (char *)database_dot_table.c_str()); + tc->server_id = lheader->server_id; + tc->binlog_pos = lheader->next_position; + tc->gtid_known = gtid_known; + tc->gtid_len = gtid.get_gtid_length(); + tc->gtid = (unsigned char *)malloc(tc->gtid_len); + memcpy(tc->gtid, gtid.get_gtid(), tc->gtid_len); - table_consistency_map.insert(pair(database_dot_table,tb_c)); + table_consistency_map.insert(pair(database_dot_table, tc)); } else { // Consistency for this table and server found, update the // consistency values @@ -125,6 +170,14 @@ tbrl_update_consistency( memcpy(tc->gtid, gtid.get_gtid(), tc->gtid_len); tc->gtid_known = gtid_known; } + + if (tbr_trace) { + // This will log error to log file + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Trace: Current state for table %s in server %d binlog_pos %lu GTID '%s'", + tc->db_table, tc->server_id, tc->binlog_pos, gtid.get_string().c_str()); + } + } /***********************************************************************//** @@ -139,205 +192,235 @@ void* tb_replication_listener_reader( definition. */ { - replication_listener_t *rlt = (replication_listener_t*)arg; - char *uri = rlt->server_url; - map tid2tname; - map::iterator tb_it; - pthread_t id = pthread_self(); - string database_dot_table; - const char* server_type; - Gtid gtid(0,1,31); - bool gtid_known = false; + replication_listener_t *rlt = (replication_listener_t*)arg; + char *uri = rlt->server_url; + map tid2tname; + map::iterator tb_it; + pthread_t id = pthread_self(); + string database_dot_table; + const char* server_type; + Gtid gtid(0,1,31); + bool gtid_known = false; - try { - Binary_log binlog(create_transport(uri), uri); - binlog.connect(gtid); + try { + Binary_log binlog(create_transport(uri), uri); + binlog.connect(gtid); - { - // Need to be protected by mutex to avoid concurrency problems - boost::mutex::scoped_lock lock(table_replication_mutex); - table_replication_listeners[rlt->listener_id] = &binlog; - } + { + // Need to be protected by mutex to avoid concurrency problems + boost::mutex::scoped_lock lock(table_replication_mutex); + table_replication_listeners[rlt->listener_id] = &binlog; + } - server_type = binlog.get_mysql_server_type_str(); + // Set up the master + if (rlt->is_master) { + master = rlt; + } - cout << "Server " << uri << " type: " << server_type << endl; + server_type = binlog.get_mysql_server_type_str(); - Binary_log_event *event; + if (tbr_trace) { + string trace_msg = "Server " + string(uri) + string(server_type); + skygw_log_write_flush(NULL, LOGFILE_TRACE, (char *)trace_msg.c_str()); + } - // While we have events - while (true) { - Log_event_header *lheader; + Binary_log_event *event; - // Wait for the next event - int result = binlog.wait_for_next_event(&event); + // While we have events + while (true) { + Log_event_header *lheader; - if (result == ERR_EOF) - break; + // Wait for the next event + int result = binlog.wait_for_next_event(&event); - lheader = event->header(); + if (result == ERR_EOF) + break; - switch(event->get_event_type()) { + lheader = event->header(); - case QUERY_EVENT: { - Query_event *qevent = dynamic_cast(event); - int n_tables = 0; + switch(event->get_event_type()) { - // This is overkill but we really do not know how - // many names there are at this state - char **db_names = (char **) malloc(qevent->query.size()+1 * sizeof(char *)); - char **table_names = (char **) malloc(qevent->query.size()+1 * sizeof(char *)); + case QUERY_EVENT: { + Query_event *qevent = dynamic_cast(event); + int n_tables = 0; - // Try to parse db.table names from the SQL-clause - if (tbr_parser_table_names(db_names, table_names, &n_tables, qevent->query.c_str())) { - // Success, set up the consistency - // information for every table - for(int k=0;k < n_tables; k++) { - // Update the consistency - // information + // This is overkill but we really do not know how + // many names there are at this state + char **db_names = (char **) malloc(qevent->query.size()+1 * sizeof(char *)); + char **table_names = (char **) malloc(qevent->query.size()+1 * sizeof(char *)); - if(db_names[k][0]=='\0') { - database_dot_table = qevent->db_name; - } else { - database_dot_table = string(db_names[k]); - } - database_dot_table.append("."); - database_dot_table.append(string(table_names[k])); + // Try to parse db.table names from the SQL-clause + if (tbr_parser_table_names(db_names, table_names, &n_tables, qevent->query.c_str())) { + // Success, set up the consistency + // information for every table + for(int k=0;k < n_tables; k++) { + // Update the consistency + // information - tbrl_update_consistency(lheader, database_dot_table, gtid_known, gtid); + if(db_names[k][0]=='\0') { + database_dot_table = qevent->db_name; + } else { + database_dot_table = string(db_names[k]); + } + database_dot_table.append("."); + database_dot_table.append(string(table_names[k])); - free(db_names[k]); - free(table_names[k]); - } - free(db_names); - free(table_names); - } else { - for(int k=0; k < n_tables; k++) { - free(db_names[k]); - free(table_names[k]); - } - free(db_names); - free(table_names); - } + tbrl_update_consistency(lheader, database_dot_table, gtid_known, gtid); - std::cout << "Thread: " << id << " server_id " << lheader->server_id - << " position " << lheader->next_position << " : Found event of type " - << event->get_event_type() - << " txt " << get_event_type_str(event->get_event_type()) - << " query " << qevent->query << " db " << qevent->db_name - << " gtid " << gtid.get_string() - << std::endl; - break; - } + free(db_names[k]); + free(table_names[k]); + } + free(db_names); + free(table_names); + } else { + for(int k=0; k < n_tables; k++) { + free(db_names[k]); + free(table_names[k]); + } + free(db_names); + free(table_names); + } - /* - Event is global transaction identifier. We need to store - value of this and handle actual state later. - */ - case GTID_EVENT_MARIADB: - case GTID_EVENT_MYSQL: - { - Gtid_event *gevent = dynamic_cast(event); + if (tbr_debug) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Debug: Thread %ld Server %d Binlog_pos %lu event %d" + " : %s Query %s DB %s gtid '%s'", + id, + lheader->server_id, + lheader->next_position, + event->get_event_type(), + get_event_type_str(event->get_event_type()), + qevent->query.c_str(), + qevent->db_name.c_str(), + gtid.get_string().c_str()); + } + break; + } - if (binlog.get_mysql_server_type() == MYSQL_SERVER_TYPE_MARIADB) { - gtid_known = true; - gtid = Gtid(gevent->domain_id, gevent->server_id, gevent->sequence_number); - } else { - gtid_known = true; - gtid = Gtid(gevent->m_mysql_gtid); - } + /* + Event is global transaction identifier. We need to store + value of this and handle actual state later. + */ + case GTID_EVENT_MARIADB: + case GTID_EVENT_MYSQL: { + Gtid_event *gevent = dynamic_cast(event); - std::cout << "Thread: " << id << " server_id " << lheader->server_id - << " position " << lheader->next_position << " : Found event of type " - << event->get_event_type() - << " txt " << get_event_type_str(event->get_event_type()) - << " gtid " << gtid.get_string() - << std::endl; + if (binlog.get_mysql_server_type() == MYSQL_SERVER_TYPE_MARIADB) { + gtid_known = true; + gtid = Gtid(gevent->domain_id, gevent->server_id, gevent->sequence_number); + } else { + gtid_known = true; + gtid = Gtid(gevent->m_mysql_gtid); + } + if (tbr_debug) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Debug: Thread %ld Server %d Binlog_pos %lu event %d" + " : %s gtid '%s'", + id, + lheader->server_id, + lheader->next_position, + event->get_event_type(), + get_event_type_str(event->get_event_type()), + gtid.get_string().c_str()); + } - break; + break; + } - } + // With this event we know to which database and table the + // following events will be targeted. + case TABLE_MAP_EVENT: { + Table_map_event *table_map_event= dynamic_cast(event); + database_dot_table= table_map_event->db_name; + database_dot_table.append("."); + database_dot_table.append(table_map_event->table_name); + tid2tname[table_map_event->table_id]= database_dot_table; - // With this event we know to which database and table the - // following events will be targeted. - case TABLE_MAP_EVENT: - { - Table_map_event *table_map_event= dynamic_cast(event); - database_dot_table= table_map_event->db_name; - database_dot_table.append("."); - database_dot_table.append(table_map_event->table_name); - tid2tname[table_map_event->table_id]= database_dot_table; - break; - } + if (tbr_debug) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Debug: Thread %ld Server %d Binlog_pos %lu event %d" + " : %s dbtable '%s' id %d", + id, + lheader->server_id, + lheader->next_position, + event->get_event_type(), + get_event_type_str(event->get_event_type()), + database_dot_table.c_str(), + table_map_event->table_id); + } - /* This is row based replication event containing INSERT, - UPDATE or DELETE clause broken to rows */ - case WRITE_ROWS_EVENT: - case UPDATE_ROWS_EVENT: - case DELETE_ROWS_EVENT: - { - Row_event *revent = dynamic_cast(event); - tb_it= tid2tname.begin(); - tb_it= tid2tname.find(revent->table_id); - if (tb_it != tid2tname.end()) - { - database_dot_table= tb_it->second; - } + break; + } - // Update the consistency information - tbrl_update_consistency(lheader, database_dot_table, gtid_known, gtid); + /* This is row based replication event containing INSERT, + UPDATE or DELETE clause broken to rows */ + case WRITE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case DELETE_ROWS_EVENT: { + Row_event *revent = dynamic_cast(event); + tb_it= tid2tname.begin(); + tb_it= tid2tname.find(revent->table_id); - std::cout << "Thread: " << id << " server_id " << lheader->server_id - << " position " << lheader->next_position << " : Found event of type " - << event->get_event_type() - << " txt " << get_event_type_str(event->get_event_type()) - << " table " << revent->table_id - << " tb " << database_dot_table - << " gtid " << gtid.get_string() - << std::endl; - break; + // DB.table name found + if (tb_it != tid2tname.end()) + { + database_dot_table= tb_it->second; + } - } - // Default event handler, do nothing - default: - break; - } // switch - } // while - } // try - catch(ListenerException e) - { - std::cerr << "Listener exception: " << e.what() << std::endl; - // Re-Throw this one. - throw; - } - catch(boost::system::error_code e) - { - std::cerr << "Listener system error: " << e.message() << std::endl; - // Re-Throw this one. - throw; - } - // Try and catch all exceptions - catch(std::exception const& e) - { - std::cerr << "Listener other error: " << e.what() << std::endl; - // Re-Throw this one. - throw; - } - // Rest of them - catch(...) - { - std::cerr << "Unknown exception: " << std::endl; - // Re-Throw this one. - // It was not handled so you want to make sure it is handled correctly by - // the OS. So just allow the exception to keep propagating. - throw; - } + // Update the consistency information + tbrl_update_consistency(lheader, database_dot_table, gtid_known, gtid); - // Thread execution will end here - pthread_exit(NULL); - return NULL; + break; + } + // Default event handler, do nothing + default: + break; + } // switch + } // while + } // try + catch(ListenerException e) + { + string err = std::string("Listener exception: ")+ e.what(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + // Re-Throw this one. + throw; + } + catch(boost::system::error_code e) + { + string err = std::string("Listener system exception: ")+ e.message(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + // Re-Throw this one. + throw; + } + // Try and catch all exceptions + catch(std::exception const& e) + { + string err = std::string("Listener other exception: ")+ e.what(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + // Re-Throw this one. + throw; + } + // Rest of them + catch(...) + { + string err = std::string("Unknown exception: "); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + // Re-Throw this one. + // It was not handled so you want to make sure it is handled correctly by + // the OS. So just allow the exception to keep propagating. + throw; + } + + if (tbr_trace) { + string trace_msg = string("Listener for server ") + string(uri) + string(server_type) + string(" shutting down"); + skygw_log_write_flush(NULL, LOGFILE_TRACE, (char *)trace_msg.c_str()); + } + + // Thread execution will end here + pthread_exit(NULL); + return NULL; } /***********************************************************************//** @@ -361,32 +444,43 @@ tb_replication_listener_shutdown( if ( b_it != table_replication_listeners.end()) { Binary_log *binlog = (*b_it).second; + + if (tbr_debug) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Debug: Shutting down replication listener for server %s", + binlog->get_url().c_str()); + } + try { binlog->shutdown(); } catch(ListenerException e) { - std::cerr << "Listener exception: " << e.what() << std::endl; + string err = std::string("Listener exception: ")+ e.what(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); // Re-Throw this one. throw; } catch(boost::system::error_code e) { - std::cerr << "Listener system error: " << e.message() << std::endl; + string err = std::string("Listener system exception: ")+ e.message(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); // Re-Throw this one. throw; } // Try and catch all exceptions catch(std::exception const& e) { - std::cerr << "Listener other error: " << e.what() << std::endl; + string err = std::string("Listener other exception: ")+ e.what(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); // Re-Throw this one. throw; } // Rest of them catch(...) { - std::cerr << "Unknown exception: " << std::endl; + string err = std::string("Unknown exception: "); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); // Re-Throw this one. // It was not handled so you want to make sure it is handled correctly by // the OS. So just allow the exception to keep propagating. @@ -398,6 +492,9 @@ tb_replication_listener_shutdown( std::string err = std::string("Replication listener for server_id = ") + to_string(server_id) + std::string(" not active "); *error_message = (char *)malloc(err.size()+1); strcpy(*error_message, err.c_str()); + + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + return (1); } } @@ -412,7 +509,7 @@ there is information how many results where available. int tb_replication_listener_consistency( /*================================*/ - const char *db_dot_table, /*!< in: Fully qualified table + const unsigned char *db_dot_table, /*!< in: Fully qualified table name. */ table_consistency_t *tb_consistency, /*!< out: Consistency values. */ boost::uint32_t server_no) /*!< in: Server */ @@ -425,7 +522,7 @@ tb_replication_listener_consistency( boost::mutex::scoped_lock lock(table_consistency_mutex); // Loop through the consistency values - for(multimap::iterator i = table_consistency_map.find(db_dot_table); + for(multimap::iterator i = table_consistency_map.find((char *)db_dot_table); i != table_consistency_map.end(); ++i, ++cur_server) { if (cur_server == server_no) { tc = (*i).second; @@ -436,6 +533,12 @@ tb_replication_listener_consistency( } if (found) { + if (tbr_trace) { + // This will log error to log file + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Trace: Current state for table %s in server %d binlog_pos %lu GTID '%s'", + tc->db_table, tc->server_id, tc->binlog_pos, tc->gtid); + } return (1); } else { return (0); @@ -473,6 +576,13 @@ tb_replication_listener_reconnect( } if (found) { + + if (tbr_debug) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Debug: Reconnecting to server %s", + binlog->get_url().c_str()); + } + try { // Shutdown the current listener thread binlog->shutdown(); @@ -503,33 +613,38 @@ tb_replication_listener_reconnect( } catch(ListenerException e) { - std::cerr << "Listener exception: " << e.what() << std::endl; + string err = std::string("Listener exception: ")+ e.what(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); // Re-Throw this one. throw; } catch(boost::system::error_code e) { - std::cerr << "Listener system error: " << e.message() << std::endl; + string err = std::string("Listener system exception: ")+ e.message(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); // Re-Throw this one. throw; } // Try and catch all exceptions catch(std::exception const& e) { - std::cerr << "Listener other error: " << e.what() << std::endl; + string err = std::string("Listener other exception: ")+ e.what(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); // Re-Throw this one. throw; } // Rest of them catch(...) { - std::cerr << "Unknown exception: " << std::endl; + string err = std::string("Unknown exception: "); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); // Re-Throw this one. // It was not handled so you want to make sure it is handled correctly by // the OS. So just allow the exception to keep propagating. throw; } + } else { errmsg = std::string("Replication listener not found"); error_message = (char *)errmsg.c_str(); @@ -541,6 +656,7 @@ err_exit: if (error_message) { rpl->error_message = (char *)malloc(strlen(error_message +1)); strcpy(rpl->error_message, error_message); + skygw_log_write_flush(NULL, LOGFILE_ERROR, error_message); } return (1); @@ -556,45 +672,17 @@ void /*======================================*/ void *arg) /*!< in: Master definition */ { - replication_listener_t *master = (replication_listener_t*)arg; + master = (replication_listener_t*)arg; tbr_metadata_t **tm=NULL; + bool err = false; - char *body = master->server_url; - size_t len = strlen(master->server_url); - - /* Find the beginning of the user name */ - strncmp(body, "//", 2); - - /* Find the user name, which is mandatory */ - const char *user = body + 2; - const char *user_end= strpbrk(user, ":@"); - - /* Find the password, which can be empty */ - assert(*user_end == ':' || *user_end == '@'); - const char *const pass = user_end + 1; // Skip the ':' (or '@') - const char *pass_end = pass; - if (*user_end == ':') - { - pass_end = strchr(pass, '@'); - } - - /* Find the host name, which is mandatory */ - // Skip the '@', if there is one - const char *host = *pass_end == '@' ? pass_end + 1 : pass_end; - const char *host_end = strchr(host, ':'); - /* If no ':' was found there is no port, so the host end at the end - * of the string */ - if (host_end == 0) - host_end = body + len; - /* Find the port number */ - unsigned long portno = 3306; - if (*host_end == ':') - portno = strtoul(host_end + 1, NULL, 10); + // Set up master connect info + tbrl_extract_master_connect_info(); while(listener_shutdown == false) { sleep(10000); // Sleep ~10 seconds - { + try { // Need to be protected by mutex to avoid concurrency problems boost::mutex::scoped_lock lock(table_consistency_mutex); @@ -614,14 +702,126 @@ void lock.unlock(); // Insert or update metadata information - tbrm_write_metadata(host, user, pass, portno, tm, nelems); + err = tbrm_write_metadata( + (const char *)master_host, + (const char *)master_user, + (const char *)master_passwd, + (unsigned int)master_port, + tm, + nelems); + + free(tm); + } + catch(ListenerException e) + { + string err = std::string("Listener exception: ")+ e.what(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + goto my_exit; + } + catch(boost::system::error_code e) + { + string err = std::string("Listener system exception: ")+ e.message(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + goto my_exit; + } + // Try and catch all exceptions + catch(std::exception const& e) + { + string err = std::string("Listener other exception: ")+ e.what(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + goto my_exit; + } + // Rest of them + catch(...) + { + string err = std::string("Unknown exception: "); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + goto my_exit; + } + } + +my_exit: + if (tbr_trace) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, (char *)"Shutting down the metadata updater thread"); } pthread_exit(NULL); return NULL; } +/***********************************************************************//** +Write current state of the metadata to the MySQL server and +clean up the data structures. +@return 0 on success, error code at failure. */ +int +tb_replication_listener_done( +/*==========================*/ + char **error_message) /*!< out: error message */ +{ + size_t nelems = table_consistency_map.size(); + size_t k =0; + tbr_metadata_t **tm=NULL; + int err = 0; + + tm = (tbr_metadata_t**)calloc(nelems, sizeof(tbr_metadata_t*)); + + try { + for(multimap::iterator i = table_consistency_map.begin(); + i != table_consistency_map.end(); ++i,++k) { + tm[k] = ((*i).second); + } + + // Insert or update metadata information + err = tbrm_write_metadata(master_host, master_user, master_passwd, master_port, tm, (size_t)nelems); + + free(tm); + + // Clean up memory allocation for multimap items + for(multimap::iterator i = table_consistency_map.begin(); + i != table_consistency_map.end(); ++i,++k) { + tbr_metadata_t *trm = ((*i).second); + + free(trm->db_table); + free(trm->gtid); + + table_consistency_map.erase(i); + free(trm); + } + + // Clean up binlog listeners + table_replication_listeners.erase(table_replication_listeners.begin(), table_replication_listeners.end()); + } + catch(ListenerException e) + { + string err = std::string("Listener exception: ")+ e.what(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + } + catch(boost::system::error_code e) + { + string err = std::string("Listener system exception: ")+ e.message(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + } + // Try and catch all exceptions + catch(std::exception const& e) + { + string err = std::string("Listener other exception: ")+ e.what(); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + } + // Rest of them + catch(...) + { + string err = std::string("Unknown exception: "); + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + } + + if (tbr_trace) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, (char *)"Shutting down the listeners"); + } + + return err; +} + } // namespace table_replication_listener } // namespace mysql diff --git a/table_replication_consistency/table_replication_listener.h b/table_replication_consistency/table_replication_listener.h index bb917f90a..f3373dc62 100644 --- a/table_replication_consistency/table_replication_listener.h +++ b/table_replication_consistency/table_replication_listener.h @@ -81,6 +81,25 @@ tb_replication_listener_shutdown( boost::uint32_t server_id, /*!< in: server id */ char **error_message); /*!< out: error message */ +/***********************************************************************//** +This internal function is executed on its own thread and it will write +table consistency information to the master database in every n seconds +based on configuration. +*/ +void +*tb_replication_listener_metadata_updater( +/*======================================*/ + void *arg); /*!< in: Master definition */ + +/***********************************************************************//** +Write current state of the metadata to the MySQL server and +clean up the data structures. +@return 0 on success, error code at failure. */ +int +tb_replication_listener_done( +/*==========================*/ + char **error_message); /*!< out: error message */ + } // table_replication_listener diff --git a/table_replication_consistency/table_replication_metadata.cpp b/table_replication_consistency/table_replication_metadata.cpp index 720b1784b..897364099 100644 --- a/table_replication_consistency/table_replication_metadata.cpp +++ b/table_replication_consistency/table_replication_metadata.cpp @@ -36,6 +36,8 @@ Updated: #include #include #include "table_replication_metadata.h" +#include "table_replication_consistency.h" +#include "log_manager.h" namespace mysql { @@ -51,9 +53,11 @@ tbrm_report_error( const char *file, int line) { - fprintf(stderr, "%s at file %s line %d\n", message, file, line); + skygw_log_write_flush(NULL, LOGFILE_ERROR, + (char *)"%s at file %s line %d", message, file, line); if (con != NULL) { - fprintf(stderr, "%s\n", mysql_error(con)); + skygw_log_write_flush(NULL, LOGFILE_ERROR, + (char *)"%s", mysql_error(con)); mysql_close(con); } } @@ -68,10 +72,13 @@ tbrm_stmt_error( const char *file, int line) { - fprintf (stderr, "%s at file %s line %d\n", message, file, line); + skygw_log_write_flush(NULL, LOGFILE_ERROR, + (char *)"%s at file %s line %d", message, file, line); + if (stmt != NULL) { - fprintf (stderr, "Error %u (%s): %s\n", + skygw_log_write_flush(NULL, LOGFILE_ERROR, + (char *)"Error %u (%s): %s\n", mysql_stmt_errno (stmt), mysql_stmt_sqlstate (stmt), mysql_stmt_error (stmt)); @@ -280,7 +287,7 @@ tbrm_write_metadata( unsigned int master_port, /*!< in: master port */ tbr_metadata_t **tbrm_meta, /*!< in: table replication consistency metadata. */ - boost::uint32_t tbrm_rows) /*!< in: number of rows read */ + size_t tbrm_rows) /*!< in: number of rows read */ { MYSQL *con = mysql_init(NULL); int myerrno=0; @@ -457,6 +464,12 @@ tbrm_write_metadata( tbrm_stmt_error(ustmt, "Error: Could not execute update statement", __FILE__, __LINE__); goto error_exit; } + if (tbr_debug) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Debug: Metadata state updated for %s in server %d is binlog_pos %lu gtid '%s'", + dbtable, serverid, binlogpos, gtid); + } + } else { // Insert the consistency information binlogpos = tbrm_meta[i]->binlog_pos; @@ -471,6 +484,12 @@ tbrm_write_metadata( tbrm_stmt_error(istmt, "Error: Could not execute insert statement", __FILE__, __LINE__); goto error_exit; } + + if (tbr_debug) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Debug: Metadata state inserted for %s in server %d is binlog_pos %lu gtid '%s'", + dbtable, serverid, binlogpos, gtid); + } } } diff --git a/table_replication_consistency/table_replication_metadata.h b/table_replication_consistency/table_replication_metadata.h index cced1782e..6ec016ed3 100644 --- a/table_replication_consistency/table_replication_metadata.h +++ b/table_replication_consistency/table_replication_metadata.h @@ -24,6 +24,11 @@ Updated: #ifndef TABLE_REPLICATION_METADATA_H #define TABLE_REPLICATION_METADATA_H +namespace mysql { + +namespace table_replication_metadata { + + /* Structure definition for table replication oconsistency metadata */ typedef struct { unsigned char* db_table; /* Fully qualified db.table name, @@ -65,6 +70,10 @@ tbrm_write_metadata( metadata. */ size_t tbrm_rows); /*!< in: number of rows read */ -#endif +} // table_replication_metadata + +} // mysql + +#endif diff --git a/table_replication_consistency/table_replication_parser.cpp b/table_replication_consistency/table_replication_parser.cpp index ea47e73ee..56684f09b 100644 --- a/table_replication_consistency/table_replication_parser.cpp +++ b/table_replication_consistency/table_replication_parser.cpp @@ -29,6 +29,8 @@ Updated: #include #include "table_replication_parser.h" +#include "table_replication_consistency.h" +#include "log_manager.h" namespace mysql { @@ -362,6 +364,12 @@ tbr_parser_table_names( db_name[name_count] = dbname; table_name[name_count] = tbname; name_count++; + + if (tbr_debug) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Debug: INSERT OR REPLACE to %s.%s", + dbname, tbname); + } } else { free(dbname); free(tbname); @@ -394,6 +402,12 @@ tbr_parser_table_names( db_name[name_count] = dbname; table_name[name_count] = tbname; name_count++; + + if (tbr_debug) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Debug: DELETE OR UPDATE to %s.%s", + dbname, tbname); + } } else { free(dbname); free(tbname); @@ -417,6 +431,12 @@ tbr_parser_table_names( db_name[name_count] = dbname; table_name[name_count] = tbname; name_count++; + + if (tbr_debug) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Debug: LOAD to %s.%s", + dbname, tbname); + } } else { free(dbname); free(tbname); diff --git a/table_replication_consistency/test/CMakeLists.txt b/table_replication_consistency/test/CMakeLists.txt index 67249312f..d88b15a5d 100644 --- a/table_replication_consistency/test/CMakeLists.txt +++ b/table_replication_consistency/test/CMakeLists.txt @@ -18,8 +18,17 @@ find_path(SkySQL_INCLUDE_DIR skygw_debug.h /usr/local/include /usr/include ../../utils) include_directories(${SkySQL_INCLUDE_DIR}) +find_path(TRC_INCLUDE_DIR table_replication_consistency.h + ../ /usr/include /usr/local/include) +include_directories(${TRC_INCLUDE_DIR}) + # Build rule for example foreach(prog Example) - ADD_EXECUTABLE(${prog} ${prog}.c) - TARGET_LINK_LIBRARIES(${prog} table_replication_consistency.a replication boost_system pthread stdc++ ${MySQL_LIBRARY} crypt aio) + ADD_EXECUTABLE(${prog} ${prog}.c ../../utils/skygw_utils.o ) + TARGET_LINK_LIBRARIES(${prog} table_replication_consistency.a replication boost_system pthread stdc++ ${MySQL_LIBRARY} crypt aio log_manager) +endforeach() + +foreach(prog test) + ADD_EXECUTABLE(${prog} ${prog}.cpp ../../utils/skygw_utils.o ) + TARGET_LINK_LIBRARIES(${prog} table_replication_consistency.a replication boost_system pthread stdc++ ${MySQL_LIBRARY} crypt aio log_manager) endforeach() diff --git a/table_replication_consistency/test/Example b/table_replication_consistency/test/Example index a4a733af5..ad96ea580 100755 Binary files a/table_replication_consistency/test/Example and b/table_replication_consistency/test/Example differ diff --git a/table_replication_consistency/test/Example.c b/table_replication_consistency/test/Example.c index fce6600c3..cf09ee095 100644 --- a/table_replication_consistency/test/Example.c +++ b/table_replication_consistency/test/Example.c @@ -1,10 +1,13 @@ -#include "table_replication_consistency.h" #include #include #include #include #include #include +#ifndef bool +#define bool int +#endif +#include "table_replication_consistency.h" static char* server_options[] = { "jan test", @@ -24,13 +27,14 @@ static char* server_groups[] = { NULL }; -int main(int argc, char** argv) +int main(int argc, char** argv) { int i=0,k=0; char *uri; replication_listener_t *mrl; int err=0; + char *errstr=NULL; // This will initialize MySQL if (mysql_server_init(num_elements, server_options, server_groups)) { @@ -62,7 +66,7 @@ int main(int argc, char** argv) } }//end of outer while loop - err = tb_replication_consistency_init(mrl, k, 5); + err = tb_replication_consistency_init(mrl, k, 5, TBR_TRACE_DEBUG); if (err ) { perror(NULL); @@ -73,6 +77,13 @@ int main(int argc, char** argv) sleep(3); } + err = tb_replication_consistency_shutdown(&errstr); + + if (*errstr) { + fprintf(stderr, "%s\n", errstr); + free(errstr); + } + exit(0); } diff --git a/table_replication_consistency/test/Makefile b/table_replication_consistency/test/Makefile index 2faced78f..d30acc542 100644 --- a/table_replication_consistency/test/Makefile +++ b/table_replication_consistency/test/Makefile @@ -111,6 +111,19 @@ Example/fast: $(MAKE) -f CMakeFiles/Example.dir/build.make CMakeFiles/Example.dir/build .PHONY : Example/fast +#============================================================================= +# Target rules for targets named test + +# Build rule for target. +test: cmake_check_build_system + $(MAKE) -f CMakeFiles/Makefile2 test +.PHONY : test + +# fast build rule for target. +test/fast: + $(MAKE) -f CMakeFiles/test.dir/build.make CMakeFiles/test.dir/build +.PHONY : test/fast + Example.o: Example.c.o .PHONY : Example.o @@ -135,6 +148,30 @@ Example.c.s: $(MAKE) -f CMakeFiles/Example.dir/build.make CMakeFiles/Example.dir/Example.c.s .PHONY : Example.c.s +test.o: test.cpp.o +.PHONY : test.o + +# target to build an object file +test.cpp.o: + $(MAKE) -f CMakeFiles/test.dir/build.make CMakeFiles/test.dir/test.cpp.o +.PHONY : test.cpp.o + +test.i: test.cpp.i +.PHONY : test.i + +# target to preprocess a source file +test.cpp.i: + $(MAKE) -f CMakeFiles/test.dir/build.make CMakeFiles/test.dir/test.cpp.i +.PHONY : test.cpp.i + +test.s: test.cpp.s +.PHONY : test.s + +# target to generate assembly for a file +test.cpp.s: + $(MAKE) -f CMakeFiles/test.dir/build.make CMakeFiles/test.dir/test.cpp.s +.PHONY : test.cpp.s + # Help Target help: @echo "The following are some of the valid targets for this Makefile:" @@ -144,9 +181,13 @@ help: @echo "... Example" @echo "... edit_cache" @echo "... rebuild_cache" + @echo "... test" @echo "... Example.o" @echo "... Example.i" @echo "... Example.s" + @echo "... test.o" + @echo "... test.i" + @echo "... test.s" .PHONY : help diff --git a/table_replication_consistency/test/cmake_install.cmake b/table_replication_consistency/test/cmake_install.cmake index 54c6cc19f..f6b1d3578 100644 --- a/table_replication_consistency/test/cmake_install.cmake +++ b/table_replication_consistency/test/cmake_install.cmake @@ -12,7 +12,7 @@ IF(NOT DEFINED CMAKE_INSTALL_CONFIG_NAME) STRING(REGEX REPLACE "^[^A-Za-z0-9_]+" "" CMAKE_INSTALL_CONFIG_NAME "${BUILD_TYPE}") ELSE(BUILD_TYPE) - SET(CMAKE_INSTALL_CONFIG_NAME "") + SET(CMAKE_INSTALL_CONFIG_NAME "Debug") ENDIF(BUILD_TYPE) MESSAGE(STATUS "Install configuration: \"${CMAKE_INSTALL_CONFIG_NAME}\"") ENDIF(NOT DEFINED CMAKE_INSTALL_CONFIG_NAME)