Fixed issues on compiling and linking the library against example and test

This commit is contained in:
Jan Lindström
2013-07-24 15:49:36 +03:00
parent 521a66f9c1
commit 2fc7cb5525
16 changed files with 699 additions and 283 deletions

View File

@ -23,7 +23,7 @@ UNIX :=
# makefile.inc and build_gateway.inc are located. # makefile.inc and build_gateway.inc are located.
# ROOT_PATH is used in makefile. # ROOT_PATH is used in makefile.
# #
ROOT_PATH := ROOT_PATH := /home/jan/skysql/skygateway/skygateway
# MARIADB_SRC_PATH may be defined either as an environment variable or # MARIADB_SRC_PATH may be defined either as an environment variable or
# specifically here # specifically here

View File

@ -52,7 +52,7 @@ class Gtid
Gtid() Gtid()
: m_real_gtid(false), m_domain_id(0), m_server_id(0), m_sequence_number(0), : m_real_gtid(false), m_domain_id(0), m_server_id(0), m_sequence_number(0),
m_server_type(MYSQL_SERVER_TYPE_NA), m_gtid_length(0) m_server_type(MYSQL_SERVER_TYPE_NA), m_gtid_length(0), m_mariadb_gtid(std::string(""))
{ {
memset(m_mysql_gtid, 0, MYSQL_GTID_ENCODED_SIZE); memset(m_mysql_gtid, 0, MYSQL_GTID_ENCODED_SIZE);
} }

View File

@ -4,11 +4,7 @@ cmake_minimum_required (VERSION 2.6)
# This configuration file builds both the static and shared version of # This configuration file builds both the static and shared version of
# the library. # the library.
set(table_replication_consistency_sources set(table_replication_consistency_sources table_replication_consistency.cpp table_replication_listener.cpp table_replication_parser.cpp table_replication_metadata.cpp)
table_replication_consistency.cpp
table_replication_listener.cpp
table_replication_parser.cpp
table_replication_metadata.cpp)
# ---------- Find Boost Headers/Libraries ----------------------- # ---------- Find Boost Headers/Libraries -----------------------
SET(Boost_DEBUG FALSE) SET(Boost_DEBUG FALSE)
@ -24,7 +20,7 @@ LINK_DIRECTORIES(${Boost_LIBRARY_DIRS})
INCLUDE_DIRECTORIES(${Boost_INCLUDE_DIR}) INCLUDE_DIRECTORIES(${Boost_INCLUDE_DIR})
# Find MySQL client library and header files # Find MySQL client library and header files
find_library(MySQL_LIBRARY NAMES mysqld.a PATHS find_library(MySQL_LIBRARY NAMES libmysqld.a PATHS
/usr/lib64/mysql /usr/lib/mysql /usr/local/mysql/lib) /usr/lib64/mysql /usr/lib/mysql /usr/local/mysql/lib)
find_path(MySQL_INCLUDE_DIR mysql.h find_path(MySQL_INCLUDE_DIR mysql.h
/usr/local/include/mysql /usr/include/mysql) /usr/local/include/mysql /usr/include/mysql)
@ -35,15 +31,25 @@ find_path(SkySQL_INCLUDE_DIR skygw_debug.h
/usr/local/include /usr/include ../utils) /usr/local/include /usr/include ../utils)
include_directories(${SkySQL_INCLUDE_DIR}) include_directories(${SkySQL_INCLUDE_DIR})
#log_manager
FIND_LIBRARY(LIB_LOGMANAGER log_manager /lib /opt/local/lib /opt/lib /usr/lib /usr/local/lib)
find_path(LogManager_INCLUDE_DIR log_manager.h
/usr/local/include /usr/include ../log_manager)
include_directories(${LogManager_INCLUDE_DIR})
#debug settings
SET(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DTBR_DEBUG")
SET(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -DTBR_DEBUG")
# Configure for building static library # Configure for building static library
add_library(table_replication_consistency_static STATIC ${table_replication_consistency_sources}) add_library(table_replication_consistency_static STATIC ${table_replication_consistency_sources})
target_link_libraries(table_replication_consistency_static crypto ${Boost_LIBRARIES} ${MySQL_LIBRARY}) target_link_libraries(table_replication_consistency_static crypto log_manager ${Boost_LIBRARIES} ${MySQL_LIBRARY})
set_target_properties(table_replication_consistency_static PROPERTIES set_target_properties(table_replication_consistency_static PROPERTIES
OUTPUT_NAME "table_replication_consistency") OUTPUT_NAME "table_replication_consistency")
# Configure for building shared library # Configure for building shared library
add_library(table_replication_consistency_shared SHARED ${table_replication_consistency_sources}) add_library(table_replication_consistency_shared SHARED ${table_replication_consistency_sources})
target_link_libraries(table_replication_consistency_shared crypto ${Boost_LIBRARIES} ${MySQL_LIBRARY}) target_link_libraries(table_replication_consistency_shared crypto log_manager ${Boost_LIBRARIES} ${MySQL_LIBRARY})
set_target_properties(table_replication_consistency_shared PROPERTIES set_target_properties(table_replication_consistency_shared PROPERTIES
VERSION 0.1 SOVERSION 1 VERSION 0.1 SOVERSION 1

View File

@ -33,35 +33,11 @@ IF(NOT DEFINED CMAKE_INSTALL_SO_NO_EXE)
ENDIF(NOT DEFINED CMAKE_INSTALL_SO_NO_EXE) ENDIF(NOT DEFINED CMAKE_INSTALL_SO_NO_EXE)
IF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified") IF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified")
FOREACH(file
"$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so.0.1"
"$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so.1"
"$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so"
)
IF(EXISTS "${file}" AND
NOT IS_SYMLINK "${file}")
FILE(RPATH_CHECK
FILE "${file}"
RPATH "")
ENDIF()
ENDFOREACH()
FILE(INSTALL DESTINATION "${CMAKE_INSTALL_PREFIX}/lib" TYPE SHARED_LIBRARY FILES FILE(INSTALL DESTINATION "${CMAKE_INSTALL_PREFIX}/lib" TYPE SHARED_LIBRARY FILES
"/home/jan/skysql/skygateway/skygateway/table_replication_consistency/libtable_replication_consistency.so.0.1" "/home/jan/skysql/skygateway/skygateway/table_replication_consistency/CMakeFiles/CMakeRelink.dir/libtable_replication_consistency.so.0.1"
"/home/jan/skysql/skygateway/skygateway/table_replication_consistency/libtable_replication_consistency.so.1" "/home/jan/skysql/skygateway/skygateway/table_replication_consistency/CMakeFiles/CMakeRelink.dir/libtable_replication_consistency.so.1"
"/home/jan/skysql/skygateway/skygateway/table_replication_consistency/libtable_replication_consistency.so" "/home/jan/skysql/skygateway/skygateway/table_replication_consistency/CMakeFiles/CMakeRelink.dir/libtable_replication_consistency.so"
) )
FOREACH(file
"$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so.0.1"
"$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so.1"
"$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so"
)
IF(EXISTS "${file}" AND
NOT IS_SYMLINK "${file}")
IF(CMAKE_INSTALL_DO_STRIP)
EXECUTE_PROCESS(COMMAND "/usr/bin/strip" "${file}")
ENDIF(CMAKE_INSTALL_DO_STRIP)
ENDIF()
ENDFOREACH()
ENDIF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified") ENDIF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified")
IF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified") IF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified")

View File

@ -33,19 +33,39 @@ Updated:
#include <boost/system/system_error.hpp> #include <boost/system/system_error.hpp>
#include "table_replication_consistency.h" #include "table_replication_consistency.h"
#include "table_replication_listener.h" #include "table_replication_listener.h"
#include "table_replication_metadata.h"
#include "listener_exception.h" #include "listener_exception.h"
#include "log_manager.h"
/* Global memory */ /* Global memory */
pthread_t *replication_listener_tid = NULL; pthread_t *replication_listener_tid = NULL;
pthread_t *replication_listener_metadata_tid=NULL;
unsigned int n_replication_listeners = 0; unsigned int n_replication_listeners = 0;
bool listener_shutdown=false; /* This flag will be true
at shutdown */
#ifdef TBR_DEBUG
bool tbr_trace = true;
bool tbr_debug = true;
#else
#ifdef TBR_TRACE
bool tbr_trace = true;
#else
bool tbr_trace = false;
#endif
bool tbr_trace = false;
bool tbr_debug = false;
#endif
/* Namespaces */ /* Namespaces */
using namespace std; using namespace std;
using namespace mysql; using namespace mysql;
using namespace table_replication_listener; using namespace table_replication_listener;
using namespace table_replication_metadata;
/***********************************************************************//** /***********************************************************************//**
This function will register replication listener for every server This function will register replication listener for every server
@ -58,25 +78,63 @@ tb_replication_consistency_init(
replication_listener_t *rpl, /*!< in: Server replication_listener_t *rpl, /*!< in: Server
definition. */ definition. */
size_t n_servers, /*!< in: Number of servers */ size_t n_servers, /*!< in: Number of servers */
unsigned int gateway_server_id) /*!< in: Gateway slave unsigned int gateway_server_id, /*!< in: Gateway slave
server id. */ server id. */
int trace_level) /*!< in: Trace level */
{ {
boost::uint32_t i; boost::uint32_t i;
int err = 0; int err = 0;
string errmsg=""; string errmsg="";
// Allocate memory for thread identifiers
replication_listener_tid = (pthread_t*)malloc(sizeof(pthread_t) * (n_servers + 1)); replication_listener_tid = (pthread_t*)malloc(sizeof(pthread_t) * (n_servers + 1));
if (replication_listener_tid == NULL) { if (replication_listener_tid == NULL) {
errmsg = string("Table_Replication_Consistency: out of memory"); errmsg = string("Fatal: Table_Replication_Consistency: out of memory");
goto error_handling; goto error_handling;
} }
replication_listener_metadata_tid = (pthread_t*)malloc(sizeof(pthread_t));
if (replication_listener_metadata_tid == NULL) {
free(replication_listener_tid);
errmsg = string("Fatal: Table_Replication_Consistency: out of memory");
goto error_handling;
}
// Set up trace level
if (trace_level & TBR_TRACE_DEBUG) {
tbr_debug = true;
}
if (trace_level & TBR_TRACE_TRACE) {
tbr_trace = true;
}
// Start replication stream reader thread for every server in the configuration
for(i=0;i < n_servers; i++) { for(i=0;i < n_servers; i++) {
// We need to try catch all exceptions here because function
// calling this service could be implemented using C-language
// thus we need to protect it.
try { try {
rpl[i].gateway_slave_server_id = gateway_server_id; rpl[i].gateway_slave_server_id = gateway_server_id;
rpl[i].listener_id = i; rpl[i].listener_id = i;
// For master we start also metadata updater
if (rpl[i].is_master) {
err = pthread_create(
replication_listener_metadata_tid,
NULL,
&(tb_replication_listener_metadata_updater),
(void *)&(rpl[i]));
if (err) {
errmsg = string(strerror(err));
goto error_handling;
}
}
// Start actual replication listener
err = pthread_create( err = pthread_create(
&replication_listener_tid[i], &replication_listener_tid[i],
NULL, NULL,
@ -87,12 +145,16 @@ tb_replication_consistency_init(
errmsg = string(strerror(err)); errmsg = string(strerror(err));
goto error_handling; goto error_handling;
} }
} }
// Replication listener will use this exception for
// error handling.
catch(ListenerException const& e) catch(ListenerException const& e)
{ {
errmsg = e.what(); errmsg = e.what();
goto error_handling; goto error_handling;
} }
// Boost library exceptions
catch(boost::system::error_code const& e) catch(boost::system::error_code const& e)
{ {
errmsg = e.message(); errmsg = e.message();
@ -112,6 +174,7 @@ tb_replication_consistency_init(
} }
} }
// Number of threads
n_replication_listeners = i; n_replication_listeners = i;
/* We will try to join the threads at shutdown */ /* We will try to join the threads at shutdown */
return (0); return (0);
@ -120,6 +183,10 @@ tb_replication_consistency_init(
n_replication_listeners = i; n_replication_listeners = i;
rpl[i].error_message = (char *)malloc(errmsg.size()+1); rpl[i].error_message = (char *)malloc(errmsg.size()+1);
strcpy(rpl[i].error_message, errmsg.c_str()); strcpy(rpl[i].error_message, errmsg.c_str());
// This will log error to log file
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)errmsg.c_str());
return (1); return (1);
} }
@ -150,7 +217,7 @@ tb_replication_consistency_query(
// We need to protect C client from exceptions here // We need to protect C client from exceptions here
try { try {
for(i = 0; i < *n_servers; i++) { for(i = 0; i < *n_servers; i++) {
err = tb_replication_listener_consistency(tb_query->db_dot_table, &tb_consistency[i], i); err = tb_replication_listener_consistency((const unsigned char *)tb_query->db_dot_table, &tb_consistency[i], i);
if (err) { if (err) {
goto err_exit; goto err_exit;
@ -186,6 +253,8 @@ tb_replication_consistency_query(
error_handling: error_handling:
tb_consistency[i].error_message = (char *)malloc(errmsg.size()+1); tb_consistency[i].error_message = (char *)malloc(errmsg.size()+1);
strcpy(tb_consistency[i].error_message, errmsg.c_str()); strcpy(tb_consistency[i].error_message, errmsg.c_str());
// This will log error to log file
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)errmsg.c_str());
err_exit: err_exit:
*n_servers=i-1; *n_servers=i-1;
@ -246,6 +315,8 @@ tb_replication_consistency_reconnect(
error_handling: error_handling:
rpl->error_message = (char *)malloc(errmsg.size()+1); rpl->error_message = (char *)malloc(errmsg.size()+1);
strcpy(rpl->error_message, errmsg.c_str()); strcpy(rpl->error_message, errmsg.c_str());
// This will log error to log file
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)errmsg.c_str());
err_exit: err_exit:
return (1); return (1);
@ -267,16 +338,35 @@ tb_replication_consistency_shutdown(
// We need to protect C client from exceptions here // We need to protect C client from exceptions here
try { try {
// Wait until all replication listeners are shut down
for(i = 0; i < n_replication_listeners; i++) { for(i = 0; i < n_replication_listeners; i++) {
err = tb_replication_listener_shutdown(i, error_message); err = tb_replication_listener_shutdown(i, error_message);
if (err) { if (err) {
goto err_exit; goto err_exit;
} }
// Need to wait until the thread exits
err = pthread_join(replication_listener_tid[i], (void **)error_message);
if (err) {
goto err_exit;
}
} }
// Need to wait until the thread exits listener_shutdown = true;
err = pthread_join(replication_listener_tid[i], (void **)error_message);
// Wait until metadata writer has shut down
err = pthread_join(*replication_listener_metadata_tid, NULL);
if (err) {
goto err_exit;
}
// Write metadata to MySQL storage and clean up
err = tb_replication_listener_done(error_message);
if (err) { if (err) {
goto err_exit; goto err_exit;
@ -310,6 +400,8 @@ tb_replication_consistency_shutdown(
error_handling: error_handling:
*error_message = (char *)malloc(errmsg.size()+1); *error_message = (char *)malloc(errmsg.size()+1);
strcpy(*error_message, errmsg.c_str()); strcpy(*error_message, errmsg.c_str());
// This will log error to log file
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)errmsg.c_str());
err_exit: err_exit:
return (1); return (1);

View File

@ -28,6 +28,9 @@ Updated:
#include <skygw_debug.h> #include <skygw_debug.h>
/* Global trace variables */
extern bool tbr_trace;
extern bool tbr_debug;
/* Structure definition for replication listener */ /* Structure definition for replication listener */
typedef struct { typedef struct {
@ -87,6 +90,16 @@ typedef struct table_consistency {
server failed. */ server failed. */
} table_consistency_t; } table_consistency_t;
/* Definitions for trace level */
#define TBR_TRACE_TRACE (1UL << 1) /* Trace only important events and
periodical consistency information */
/* Full trace of selected events and consistency information */
#define TBR_TRACE_DEBUG ((1UL << 2) | TBR_TRACE_TRACE)
extern bool listener_shutdown; /* This flag will be true
at shutdown */
EXTERN_C_BLOCK_BEGIN EXTERN_C_BLOCK_BEGIN
@ -103,8 +116,9 @@ tb_replication_consistency_init(
replication_listener_t *rpl, /*!< in: Server replication_listener_t *rpl, /*!< in: Server
definition. */ definition. */
size_t n_servers, /*!< in: Number of servers */ size_t n_servers, /*!< in: Number of servers */
unsigned int gateway_server_id);/*!< in: Gateway slave unsigned int gateway_server_id, /*!< in: Gateway slave
server id. */ server id. */
int trace_level); /*!< in: trace level */
/***********************************************************************//** /***********************************************************************//**
With this fuction client can request table consistency status for a With this fuction client can request table consistency status for a

View File

@ -38,6 +38,7 @@ Updated:
#include "table_replication_listener.h" #include "table_replication_listener.h"
#include "table_replication_parser.h" #include "table_replication_parser.h"
#include "table_replication_metadata.h" #include "table_replication_metadata.h"
#include "log_manager.h"
using mysql::Binary_log; using mysql::Binary_log;
using mysql::system::create_transport; using mysql::system::create_transport;
@ -45,6 +46,7 @@ using namespace std;
using namespace mysql; using namespace mysql;
using namespace system; using namespace system;
using namespace table_replication_parser; using namespace table_replication_parser;
using namespace table_replication_metadata;
namespace mysql { namespace mysql {
@ -66,10 +68,53 @@ boost::mutex table_replication_mutex; /* This mutex is used protect
abve data structure from abve data structure from
multiple threads */ multiple threads */
bool listener_shutdown = false; /* This flag will be true replication_listener_t *master; /* Master server definition */
at shutdown */
pthread_t metadata_updater_pid; /* Thread id for metadata /* Master connect info */
updater*/ const char *master_user=NULL;
const char *master_passwd=NULL;
const char *master_host=NULL;
unsigned long master_port=3306;
/***********************************************************************//**
Internal function to extract user, passwd, hostname and port from
replication listener url. */
static void
tbrl_extract_master_connect_info()
/*==============================*/
{
char *body = master->server_url;
size_t len = strlen(master->server_url);
/* Find the beginning of the user name */
strncmp(body, "//", 2);
/* Find the user name, which is mandatory */
master_user = body + 2;
const char *user_end= strpbrk(master_user, ":@");
/* Find the password, which can be empty */
assert(*user_end == ':' || *user_end == '@');
master_passwd = user_end + 1; // Skip the ':' (or '@')
const char *pass_end = master_passwd;
if (*user_end == ':')
{
pass_end = strchr(master_passwd, '@');
}
/* Find the host name, which is mandatory */
// Skip the '@', if there is one
master_host = *pass_end == '@' ? pass_end + 1 : pass_end;
const char *host_end = strchr(master_host, ':');
/* If no ':' was found there is no port, so the host end at the end
* of the string */
if (host_end == 0)
host_end = body + len;
/* Find the port number */
if (*host_end == ':')
master_port = strtoul(host_end + 1, NULL, 10);
}
/***********************************************************************//** /***********************************************************************//**
Internal function to update table consistency information based Internal function to update table consistency information based
@ -104,17 +149,17 @@ tbrl_update_consistency(
if(not_found) { if(not_found) {
// Consistency for this table and server not found, insert a record // Consistency for this table and server not found, insert a record
tbr_metadata_t* tb_c = (tbr_metadata_t*) malloc(sizeof(tbr_metadata_t)); tc = (tbr_metadata_t*) malloc(sizeof(tbr_metadata_t));
tb_c->db_table = (unsigned char *)malloc(database_dot_table.size()+1); tc->db_table = (unsigned char *)malloc(database_dot_table.size()+1);
strcpy((char *)tb_c->db_table, (char *)database_dot_table.c_str()); strcpy((char *)tc->db_table, (char *)database_dot_table.c_str());
tb_c->server_id = lheader->server_id; tc->server_id = lheader->server_id;
tb_c->binlog_pos = lheader->next_position; tc->binlog_pos = lheader->next_position;
tb_c->gtid_known = gtid_known; tc->gtid_known = gtid_known;
tb_c->gtid_len = gtid.get_gtid_length(); tc->gtid_len = gtid.get_gtid_length();
tb_c->gtid = (unsigned char *)malloc(tb_c->gtid_len); tc->gtid = (unsigned char *)malloc(tc->gtid_len);
memcpy(tb_c->gtid, gtid.get_gtid(), tb_c->gtid_len); memcpy(tc->gtid, gtid.get_gtid(), tc->gtid_len);
table_consistency_map.insert(pair<std::string, tbr_metadata_t*>(database_dot_table,tb_c)); table_consistency_map.insert(pair<std::string, tbr_metadata_t*>(database_dot_table, tc));
} else { } else {
// Consistency for this table and server found, update the // Consistency for this table and server found, update the
// consistency values // consistency values
@ -125,6 +170,14 @@ tbrl_update_consistency(
memcpy(tc->gtid, gtid.get_gtid(), tc->gtid_len); memcpy(tc->gtid, gtid.get_gtid(), tc->gtid_len);
tc->gtid_known = gtid_known; tc->gtid_known = gtid_known;
} }
if (tbr_trace) {
// This will log error to log file
skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Trace: Current state for table %s in server %d binlog_pos %lu GTID '%s'",
tc->db_table, tc->server_id, tc->binlog_pos, gtid.get_string().c_str());
}
} }
/***********************************************************************//** /***********************************************************************//**
@ -139,205 +192,235 @@ void* tb_replication_listener_reader(
definition. */ definition. */
{ {
replication_listener_t *rlt = (replication_listener_t*)arg; replication_listener_t *rlt = (replication_listener_t*)arg;
char *uri = rlt->server_url; char *uri = rlt->server_url;
map<int, string> tid2tname; map<int, string> tid2tname;
map<int, string>::iterator tb_it; map<int, string>::iterator tb_it;
pthread_t id = pthread_self(); pthread_t id = pthread_self();
string database_dot_table; string database_dot_table;
const char* server_type; const char* server_type;
Gtid gtid(0,1,31); Gtid gtid(0,1,31);
bool gtid_known = false; bool gtid_known = false;
try { try {
Binary_log binlog(create_transport(uri), uri); Binary_log binlog(create_transport(uri), uri);
binlog.connect(gtid); binlog.connect(gtid);
{ {
// Need to be protected by mutex to avoid concurrency problems // Need to be protected by mutex to avoid concurrency problems
boost::mutex::scoped_lock lock(table_replication_mutex); boost::mutex::scoped_lock lock(table_replication_mutex);
table_replication_listeners[rlt->listener_id] = &binlog; table_replication_listeners[rlt->listener_id] = &binlog;
} }
server_type = binlog.get_mysql_server_type_str(); // Set up the master
if (rlt->is_master) {
master = rlt;
}
cout << "Server " << uri << " type: " << server_type << endl; server_type = binlog.get_mysql_server_type_str();
Binary_log_event *event; if (tbr_trace) {
string trace_msg = "Server " + string(uri) + string(server_type);
skygw_log_write_flush(NULL, LOGFILE_TRACE, (char *)trace_msg.c_str());
}
// While we have events Binary_log_event *event;
while (true) {
Log_event_header *lheader;
// Wait for the next event // While we have events
int result = binlog.wait_for_next_event(&event); while (true) {
Log_event_header *lheader;
if (result == ERR_EOF) // Wait for the next event
break; int result = binlog.wait_for_next_event(&event);
lheader = event->header(); if (result == ERR_EOF)
break;
switch(event->get_event_type()) { lheader = event->header();
case QUERY_EVENT: { switch(event->get_event_type()) {
Query_event *qevent = dynamic_cast<Query_event *>(event);
int n_tables = 0;
// This is overkill but we really do not know how case QUERY_EVENT: {
// many names there are at this state Query_event *qevent = dynamic_cast<Query_event *>(event);
char **db_names = (char **) malloc(qevent->query.size()+1 * sizeof(char *)); int n_tables = 0;
char **table_names = (char **) malloc(qevent->query.size()+1 * sizeof(char *));
// Try to parse db.table names from the SQL-clause // This is overkill but we really do not know how
if (tbr_parser_table_names(db_names, table_names, &n_tables, qevent->query.c_str())) { // many names there are at this state
// Success, set up the consistency char **db_names = (char **) malloc(qevent->query.size()+1 * sizeof(char *));
// information for every table char **table_names = (char **) malloc(qevent->query.size()+1 * sizeof(char *));
for(int k=0;k < n_tables; k++) {
// Update the consistency
// information
if(db_names[k][0]=='\0') { // Try to parse db.table names from the SQL-clause
database_dot_table = qevent->db_name; if (tbr_parser_table_names(db_names, table_names, &n_tables, qevent->query.c_str())) {
} else { // Success, set up the consistency
database_dot_table = string(db_names[k]); // information for every table
} for(int k=0;k < n_tables; k++) {
database_dot_table.append("."); // Update the consistency
database_dot_table.append(string(table_names[k])); // information
tbrl_update_consistency(lheader, database_dot_table, gtid_known, gtid); if(db_names[k][0]=='\0') {
database_dot_table = qevent->db_name;
} else {
database_dot_table = string(db_names[k]);
}
database_dot_table.append(".");
database_dot_table.append(string(table_names[k]));
free(db_names[k]); tbrl_update_consistency(lheader, database_dot_table, gtid_known, gtid);
free(table_names[k]);
}
free(db_names);
free(table_names);
} else {
for(int k=0; k < n_tables; k++) {
free(db_names[k]);
free(table_names[k]);
}
free(db_names);
free(table_names);
}
std::cout << "Thread: " << id << " server_id " << lheader->server_id free(db_names[k]);
<< " position " << lheader->next_position << " : Found event of type " free(table_names[k]);
<< event->get_event_type() }
<< " txt " << get_event_type_str(event->get_event_type()) free(db_names);
<< " query " << qevent->query << " db " << qevent->db_name free(table_names);
<< " gtid " << gtid.get_string() } else {
<< std::endl; for(int k=0; k < n_tables; k++) {
break; free(db_names[k]);
} free(table_names[k]);
}
free(db_names);
free(table_names);
}
/* if (tbr_debug) {
Event is global transaction identifier. We need to store skygw_log_write_flush(NULL, LOGFILE_TRACE,
value of this and handle actual state later. (char *)"TRC Debug: Thread %ld Server %d Binlog_pos %lu event %d"
*/ " : %s Query %s DB %s gtid '%s'",
case GTID_EVENT_MARIADB: id,
case GTID_EVENT_MYSQL: lheader->server_id,
{ lheader->next_position,
Gtid_event *gevent = dynamic_cast<Gtid_event *>(event); event->get_event_type(),
get_event_type_str(event->get_event_type()),
qevent->query.c_str(),
qevent->db_name.c_str(),
gtid.get_string().c_str());
}
break;
}
if (binlog.get_mysql_server_type() == MYSQL_SERVER_TYPE_MARIADB) { /*
gtid_known = true; Event is global transaction identifier. We need to store
gtid = Gtid(gevent->domain_id, gevent->server_id, gevent->sequence_number); value of this and handle actual state later.
} else { */
gtid_known = true; case GTID_EVENT_MARIADB:
gtid = Gtid(gevent->m_mysql_gtid); case GTID_EVENT_MYSQL: {
} Gtid_event *gevent = dynamic_cast<Gtid_event *>(event);
std::cout << "Thread: " << id << " server_id " << lheader->server_id if (binlog.get_mysql_server_type() == MYSQL_SERVER_TYPE_MARIADB) {
<< " position " << lheader->next_position << " : Found event of type " gtid_known = true;
<< event->get_event_type() gtid = Gtid(gevent->domain_id, gevent->server_id, gevent->sequence_number);
<< " txt " << get_event_type_str(event->get_event_type()) } else {
<< " gtid " << gtid.get_string() gtid_known = true;
<< std::endl; gtid = Gtid(gevent->m_mysql_gtid);
}
if (tbr_debug) {
skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Debug: Thread %ld Server %d Binlog_pos %lu event %d"
" : %s gtid '%s'",
id,
lheader->server_id,
lheader->next_position,
event->get_event_type(),
get_event_type_str(event->get_event_type()),
gtid.get_string().c_str());
}
break; break;
}
} // With this event we know to which database and table the
// following events will be targeted.
case TABLE_MAP_EVENT: {
Table_map_event *table_map_event= dynamic_cast<Table_map_event*>(event);
database_dot_table= table_map_event->db_name;
database_dot_table.append(".");
database_dot_table.append(table_map_event->table_name);
tid2tname[table_map_event->table_id]= database_dot_table;
// With this event we know to which database and table the if (tbr_debug) {
// following events will be targeted. skygw_log_write_flush(NULL, LOGFILE_TRACE,
case TABLE_MAP_EVENT: (char *)"TRC Debug: Thread %ld Server %d Binlog_pos %lu event %d"
{ " : %s dbtable '%s' id %d",
Table_map_event *table_map_event= dynamic_cast<Table_map_event*>(event); id,
database_dot_table= table_map_event->db_name; lheader->server_id,
database_dot_table.append("."); lheader->next_position,
database_dot_table.append(table_map_event->table_name); event->get_event_type(),
tid2tname[table_map_event->table_id]= database_dot_table; get_event_type_str(event->get_event_type()),
break; database_dot_table.c_str(),
} table_map_event->table_id);
}
/* This is row based replication event containing INSERT, break;
UPDATE or DELETE clause broken to rows */ }
case WRITE_ROWS_EVENT:
case UPDATE_ROWS_EVENT:
case DELETE_ROWS_EVENT:
{
Row_event *revent = dynamic_cast<Row_event*>(event);
tb_it= tid2tname.begin();
tb_it= tid2tname.find(revent->table_id);
if (tb_it != tid2tname.end())
{
database_dot_table= tb_it->second;
}
// Update the consistency information /* This is row based replication event containing INSERT,
tbrl_update_consistency(lheader, database_dot_table, gtid_known, gtid); UPDATE or DELETE clause broken to rows */
case WRITE_ROWS_EVENT:
case UPDATE_ROWS_EVENT:
case DELETE_ROWS_EVENT: {
Row_event *revent = dynamic_cast<Row_event*>(event);
tb_it= tid2tname.begin();
tb_it= tid2tname.find(revent->table_id);
std::cout << "Thread: " << id << " server_id " << lheader->server_id // DB.table name found
<< " position " << lheader->next_position << " : Found event of type " if (tb_it != tid2tname.end())
<< event->get_event_type() {
<< " txt " << get_event_type_str(event->get_event_type()) database_dot_table= tb_it->second;
<< " table " << revent->table_id }
<< " tb " << database_dot_table
<< " gtid " << gtid.get_string()
<< std::endl;
break;
} // Update the consistency information
// Default event handler, do nothing tbrl_update_consistency(lheader, database_dot_table, gtid_known, gtid);
default:
break;
} // switch
} // while
} // try
catch(ListenerException e)
{
std::cerr << "Listener exception: " << e.what() << std::endl;
// Re-Throw this one.
throw;
}
catch(boost::system::error_code e)
{
std::cerr << "Listener system error: " << e.message() << std::endl;
// Re-Throw this one.
throw;
}
// Try and catch all exceptions
catch(std::exception const& e)
{
std::cerr << "Listener other error: " << e.what() << std::endl;
// Re-Throw this one.
throw;
}
// Rest of them
catch(...)
{
std::cerr << "Unknown exception: " << std::endl;
// Re-Throw this one.
// It was not handled so you want to make sure it is handled correctly by
// the OS. So just allow the exception to keep propagating.
throw;
}
// Thread execution will end here break;
pthread_exit(NULL);
return NULL;
}
// Default event handler, do nothing
default:
break;
} // switch
} // while
} // try
catch(ListenerException e)
{
string err = std::string("Listener exception: ")+ e.what();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
// Re-Throw this one.
throw;
}
catch(boost::system::error_code e)
{
string err = std::string("Listener system exception: ")+ e.message();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
// Re-Throw this one.
throw;
}
// Try and catch all exceptions
catch(std::exception const& e)
{
string err = std::string("Listener other exception: ")+ e.what();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
// Re-Throw this one.
throw;
}
// Rest of them
catch(...)
{
string err = std::string("Unknown exception: ");
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
// Re-Throw this one.
// It was not handled so you want to make sure it is handled correctly by
// the OS. So just allow the exception to keep propagating.
throw;
}
if (tbr_trace) {
string trace_msg = string("Listener for server ") + string(uri) + string(server_type) + string(" shutting down");
skygw_log_write_flush(NULL, LOGFILE_TRACE, (char *)trace_msg.c_str());
}
// Thread execution will end here
pthread_exit(NULL);
return NULL;
} }
/***********************************************************************//** /***********************************************************************//**
@ -361,32 +444,43 @@ tb_replication_listener_shutdown(
if ( b_it != table_replication_listeners.end()) { if ( b_it != table_replication_listeners.end()) {
Binary_log *binlog = (*b_it).second; Binary_log *binlog = (*b_it).second;
if (tbr_debug) {
skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Debug: Shutting down replication listener for server %s",
binlog->get_url().c_str());
}
try { try {
binlog->shutdown(); binlog->shutdown();
} }
catch(ListenerException e) catch(ListenerException e)
{ {
std::cerr << "Listener exception: " << e.what() << std::endl; string err = std::string("Listener exception: ")+ e.what();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
// Re-Throw this one. // Re-Throw this one.
throw; throw;
} }
catch(boost::system::error_code e) catch(boost::system::error_code e)
{ {
std::cerr << "Listener system error: " << e.message() << std::endl; string err = std::string("Listener system exception: ")+ e.message();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
// Re-Throw this one. // Re-Throw this one.
throw; throw;
} }
// Try and catch all exceptions // Try and catch all exceptions
catch(std::exception const& e) catch(std::exception const& e)
{ {
std::cerr << "Listener other error: " << e.what() << std::endl; string err = std::string("Listener other exception: ")+ e.what();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
// Re-Throw this one. // Re-Throw this one.
throw; throw;
} }
// Rest of them // Rest of them
catch(...) catch(...)
{ {
std::cerr << "Unknown exception: " << std::endl; string err = std::string("Unknown exception: ");
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
// Re-Throw this one. // Re-Throw this one.
// It was not handled so you want to make sure it is handled correctly by // It was not handled so you want to make sure it is handled correctly by
// the OS. So just allow the exception to keep propagating. // the OS. So just allow the exception to keep propagating.
@ -398,6 +492,9 @@ tb_replication_listener_shutdown(
std::string err = std::string("Replication listener for server_id = ") + to_string(server_id) + std::string(" not active "); std::string err = std::string("Replication listener for server_id = ") + to_string(server_id) + std::string(" not active ");
*error_message = (char *)malloc(err.size()+1); *error_message = (char *)malloc(err.size()+1);
strcpy(*error_message, err.c_str()); strcpy(*error_message, err.c_str());
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
return (1); return (1);
} }
} }
@ -412,7 +509,7 @@ there is information how many results where available.
int int
tb_replication_listener_consistency( tb_replication_listener_consistency(
/*================================*/ /*================================*/
const char *db_dot_table, /*!< in: Fully qualified table const unsigned char *db_dot_table, /*!< in: Fully qualified table
name. */ name. */
table_consistency_t *tb_consistency, /*!< out: Consistency values. */ table_consistency_t *tb_consistency, /*!< out: Consistency values. */
boost::uint32_t server_no) /*!< in: Server */ boost::uint32_t server_no) /*!< in: Server */
@ -425,7 +522,7 @@ tb_replication_listener_consistency(
boost::mutex::scoped_lock lock(table_consistency_mutex); boost::mutex::scoped_lock lock(table_consistency_mutex);
// Loop through the consistency values // Loop through the consistency values
for(multimap<std::string, tbr_metadata_t*>::iterator i = table_consistency_map.find(db_dot_table); for(multimap<std::string, tbr_metadata_t*>::iterator i = table_consistency_map.find((char *)db_dot_table);
i != table_consistency_map.end(); ++i, ++cur_server) { i != table_consistency_map.end(); ++i, ++cur_server) {
if (cur_server == server_no) { if (cur_server == server_no) {
tc = (*i).second; tc = (*i).second;
@ -436,6 +533,12 @@ tb_replication_listener_consistency(
} }
if (found) { if (found) {
if (tbr_trace) {
// This will log error to log file
skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Trace: Current state for table %s in server %d binlog_pos %lu GTID '%s'",
tc->db_table, tc->server_id, tc->binlog_pos, tc->gtid);
}
return (1); return (1);
} else { } else {
return (0); return (0);
@ -473,6 +576,13 @@ tb_replication_listener_reconnect(
} }
if (found) { if (found) {
if (tbr_debug) {
skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Debug: Reconnecting to server %s",
binlog->get_url().c_str());
}
try { try {
// Shutdown the current listener thread // Shutdown the current listener thread
binlog->shutdown(); binlog->shutdown();
@ -503,33 +613,38 @@ tb_replication_listener_reconnect(
} }
catch(ListenerException e) catch(ListenerException e)
{ {
std::cerr << "Listener exception: " << e.what() << std::endl; string err = std::string("Listener exception: ")+ e.what();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
// Re-Throw this one. // Re-Throw this one.
throw; throw;
} }
catch(boost::system::error_code e) catch(boost::system::error_code e)
{ {
std::cerr << "Listener system error: " << e.message() << std::endl; string err = std::string("Listener system exception: ")+ e.message();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
// Re-Throw this one. // Re-Throw this one.
throw; throw;
} }
// Try and catch all exceptions // Try and catch all exceptions
catch(std::exception const& e) catch(std::exception const& e)
{ {
std::cerr << "Listener other error: " << e.what() << std::endl; string err = std::string("Listener other exception: ")+ e.what();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
// Re-Throw this one. // Re-Throw this one.
throw; throw;
} }
// Rest of them // Rest of them
catch(...) catch(...)
{ {
std::cerr << "Unknown exception: " << std::endl; string err = std::string("Unknown exception: ");
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
// Re-Throw this one. // Re-Throw this one.
// It was not handled so you want to make sure it is handled correctly by // It was not handled so you want to make sure it is handled correctly by
// the OS. So just allow the exception to keep propagating. // the OS. So just allow the exception to keep propagating.
throw; throw;
} }
} else { } else {
errmsg = std::string("Replication listener not found"); errmsg = std::string("Replication listener not found");
error_message = (char *)errmsg.c_str(); error_message = (char *)errmsg.c_str();
@ -541,6 +656,7 @@ err_exit:
if (error_message) { if (error_message) {
rpl->error_message = (char *)malloc(strlen(error_message +1)); rpl->error_message = (char *)malloc(strlen(error_message +1));
strcpy(rpl->error_message, error_message); strcpy(rpl->error_message, error_message);
skygw_log_write_flush(NULL, LOGFILE_ERROR, error_message);
} }
return (1); return (1);
@ -556,45 +672,17 @@ void
/*======================================*/ /*======================================*/
void *arg) /*!< in: Master definition */ void *arg) /*!< in: Master definition */
{ {
replication_listener_t *master = (replication_listener_t*)arg; master = (replication_listener_t*)arg;
tbr_metadata_t **tm=NULL; tbr_metadata_t **tm=NULL;
bool err = false;
char *body = master->server_url; // Set up master connect info
size_t len = strlen(master->server_url); tbrl_extract_master_connect_info();
/* Find the beginning of the user name */
strncmp(body, "//", 2);
/* Find the user name, which is mandatory */
const char *user = body + 2;
const char *user_end= strpbrk(user, ":@");
/* Find the password, which can be empty */
assert(*user_end == ':' || *user_end == '@');
const char *const pass = user_end + 1; // Skip the ':' (or '@')
const char *pass_end = pass;
if (*user_end == ':')
{
pass_end = strchr(pass, '@');
}
/* Find the host name, which is mandatory */
// Skip the '@', if there is one
const char *host = *pass_end == '@' ? pass_end + 1 : pass_end;
const char *host_end = strchr(host, ':');
/* If no ':' was found there is no port, so the host end at the end
* of the string */
if (host_end == 0)
host_end = body + len;
/* Find the port number */
unsigned long portno = 3306;
if (*host_end == ':')
portno = strtoul(host_end + 1, NULL, 10);
while(listener_shutdown == false) { while(listener_shutdown == false) {
sleep(10000); // Sleep ~10 seconds sleep(10000); // Sleep ~10 seconds
{ try {
// Need to be protected by mutex to avoid concurrency problems // Need to be protected by mutex to avoid concurrency problems
boost::mutex::scoped_lock lock(table_consistency_mutex); boost::mutex::scoped_lock lock(table_consistency_mutex);
@ -614,14 +702,126 @@ void
lock.unlock(); lock.unlock();
// Insert or update metadata information // Insert or update metadata information
tbrm_write_metadata(host, user, pass, portno, tm, nelems); err = tbrm_write_metadata(
(const char *)master_host,
(const char *)master_user,
(const char *)master_passwd,
(unsigned int)master_port,
tm,
nelems);
free(tm);
} }
catch(ListenerException e)
{
string err = std::string("Listener exception: ")+ e.what();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
goto my_exit;
}
catch(boost::system::error_code e)
{
string err = std::string("Listener system exception: ")+ e.message();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
goto my_exit;
}
// Try and catch all exceptions
catch(std::exception const& e)
{
string err = std::string("Listener other exception: ")+ e.what();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
goto my_exit;
}
// Rest of them
catch(...)
{
string err = std::string("Unknown exception: ");
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
goto my_exit;
}
}
my_exit:
if (tbr_trace) {
skygw_log_write_flush(NULL, LOGFILE_TRACE, (char *)"Shutting down the metadata updater thread");
} }
pthread_exit(NULL); pthread_exit(NULL);
return NULL; return NULL;
} }
/***********************************************************************//**
Write current state of the metadata to the MySQL server and
clean up the data structures.
@return 0 on success, error code at failure. */
int
tb_replication_listener_done(
/*==========================*/
char **error_message) /*!< out: error message */
{
size_t nelems = table_consistency_map.size();
size_t k =0;
tbr_metadata_t **tm=NULL;
int err = 0;
tm = (tbr_metadata_t**)calloc(nelems, sizeof(tbr_metadata_t*));
try {
for(multimap<std::string, tbr_metadata_t*>::iterator i = table_consistency_map.begin();
i != table_consistency_map.end(); ++i,++k) {
tm[k] = ((*i).second);
}
// Insert or update metadata information
err = tbrm_write_metadata(master_host, master_user, master_passwd, master_port, tm, (size_t)nelems);
free(tm);
// Clean up memory allocation for multimap items
for(multimap<std::string, tbr_metadata_t*>::iterator i = table_consistency_map.begin();
i != table_consistency_map.end(); ++i,++k) {
tbr_metadata_t *trm = ((*i).second);
free(trm->db_table);
free(trm->gtid);
table_consistency_map.erase(i);
free(trm);
}
// Clean up binlog listeners
table_replication_listeners.erase(table_replication_listeners.begin(), table_replication_listeners.end());
}
catch(ListenerException e)
{
string err = std::string("Listener exception: ")+ e.what();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
}
catch(boost::system::error_code e)
{
string err = std::string("Listener system exception: ")+ e.message();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
}
// Try and catch all exceptions
catch(std::exception const& e)
{
string err = std::string("Listener other exception: ")+ e.what();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
}
// Rest of them
catch(...)
{
string err = std::string("Unknown exception: ");
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
}
if (tbr_trace) {
skygw_log_write_flush(NULL, LOGFILE_TRACE, (char *)"Shutting down the listeners");
}
return err;
}
} // namespace table_replication_listener } // namespace table_replication_listener
} // namespace mysql } // namespace mysql

View File

@ -81,6 +81,25 @@ tb_replication_listener_shutdown(
boost::uint32_t server_id, /*!< in: server id */ boost::uint32_t server_id, /*!< in: server id */
char **error_message); /*!< out: error message */ char **error_message); /*!< out: error message */
/***********************************************************************//**
This internal function is executed on its own thread and it will write
table consistency information to the master database in every n seconds
based on configuration.
*/
void
*tb_replication_listener_metadata_updater(
/*======================================*/
void *arg); /*!< in: Master definition */
/***********************************************************************//**
Write current state of the metadata to the MySQL server and
clean up the data structures.
@return 0 on success, error code at failure. */
int
tb_replication_listener_done(
/*==========================*/
char **error_message); /*!< out: error message */
} // table_replication_listener } // table_replication_listener

View File

@ -36,6 +36,8 @@ Updated:
#include <mysql.h> #include <mysql.h>
#include <mysqld_error.h> #include <mysqld_error.h>
#include "table_replication_metadata.h" #include "table_replication_metadata.h"
#include "table_replication_consistency.h"
#include "log_manager.h"
namespace mysql { namespace mysql {
@ -51,9 +53,11 @@ tbrm_report_error(
const char *file, const char *file,
int line) int line)
{ {
fprintf(stderr, "%s at file %s line %d\n", message, file, line); skygw_log_write_flush(NULL, LOGFILE_ERROR,
(char *)"%s at file %s line %d", message, file, line);
if (con != NULL) { if (con != NULL) {
fprintf(stderr, "%s\n", mysql_error(con)); skygw_log_write_flush(NULL, LOGFILE_ERROR,
(char *)"%s", mysql_error(con));
mysql_close(con); mysql_close(con);
} }
} }
@ -68,10 +72,13 @@ tbrm_stmt_error(
const char *file, const char *file,
int line) int line)
{ {
fprintf (stderr, "%s at file %s line %d\n", message, file, line); skygw_log_write_flush(NULL, LOGFILE_ERROR,
(char *)"%s at file %s line %d", message, file, line);
if (stmt != NULL) if (stmt != NULL)
{ {
fprintf (stderr, "Error %u (%s): %s\n", skygw_log_write_flush(NULL, LOGFILE_ERROR,
(char *)"Error %u (%s): %s\n",
mysql_stmt_errno (stmt), mysql_stmt_errno (stmt),
mysql_stmt_sqlstate (stmt), mysql_stmt_sqlstate (stmt),
mysql_stmt_error (stmt)); mysql_stmt_error (stmt));
@ -280,7 +287,7 @@ tbrm_write_metadata(
unsigned int master_port, /*!< in: master port */ unsigned int master_port, /*!< in: master port */
tbr_metadata_t **tbrm_meta, /*!< in: table replication consistency tbr_metadata_t **tbrm_meta, /*!< in: table replication consistency
metadata. */ metadata. */
boost::uint32_t tbrm_rows) /*!< in: number of rows read */ size_t tbrm_rows) /*!< in: number of rows read */
{ {
MYSQL *con = mysql_init(NULL); MYSQL *con = mysql_init(NULL);
int myerrno=0; int myerrno=0;
@ -457,6 +464,12 @@ tbrm_write_metadata(
tbrm_stmt_error(ustmt, "Error: Could not execute update statement", __FILE__, __LINE__); tbrm_stmt_error(ustmt, "Error: Could not execute update statement", __FILE__, __LINE__);
goto error_exit; goto error_exit;
} }
if (tbr_debug) {
skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Debug: Metadata state updated for %s in server %d is binlog_pos %lu gtid '%s'",
dbtable, serverid, binlogpos, gtid);
}
} else { } else {
// Insert the consistency information // Insert the consistency information
binlogpos = tbrm_meta[i]->binlog_pos; binlogpos = tbrm_meta[i]->binlog_pos;
@ -471,6 +484,12 @@ tbrm_write_metadata(
tbrm_stmt_error(istmt, "Error: Could not execute insert statement", __FILE__, __LINE__); tbrm_stmt_error(istmt, "Error: Could not execute insert statement", __FILE__, __LINE__);
goto error_exit; goto error_exit;
} }
if (tbr_debug) {
skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Debug: Metadata state inserted for %s in server %d is binlog_pos %lu gtid '%s'",
dbtable, serverid, binlogpos, gtid);
}
} }
} }

View File

@ -24,6 +24,11 @@ Updated:
#ifndef TABLE_REPLICATION_METADATA_H #ifndef TABLE_REPLICATION_METADATA_H
#define TABLE_REPLICATION_METADATA_H #define TABLE_REPLICATION_METADATA_H
namespace mysql {
namespace table_replication_metadata {
/* Structure definition for table replication oconsistency metadata */ /* Structure definition for table replication oconsistency metadata */
typedef struct { typedef struct {
unsigned char* db_table; /* Fully qualified db.table name, unsigned char* db_table; /* Fully qualified db.table name,
@ -65,6 +70,10 @@ tbrm_write_metadata(
metadata. */ metadata. */
size_t tbrm_rows); /*!< in: number of rows read */ size_t tbrm_rows); /*!< in: number of rows read */
#endif } // table_replication_metadata
} // mysql
#endif

View File

@ -29,6 +29,8 @@ Updated:
#include <stdlib.h> #include <stdlib.h>
#include "table_replication_parser.h" #include "table_replication_parser.h"
#include "table_replication_consistency.h"
#include "log_manager.h"
namespace mysql { namespace mysql {
@ -362,6 +364,12 @@ tbr_parser_table_names(
db_name[name_count] = dbname; db_name[name_count] = dbname;
table_name[name_count] = tbname; table_name[name_count] = tbname;
name_count++; name_count++;
if (tbr_debug) {
skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Debug: INSERT OR REPLACE to %s.%s",
dbname, tbname);
}
} else { } else {
free(dbname); free(dbname);
free(tbname); free(tbname);
@ -394,6 +402,12 @@ tbr_parser_table_names(
db_name[name_count] = dbname; db_name[name_count] = dbname;
table_name[name_count] = tbname; table_name[name_count] = tbname;
name_count++; name_count++;
if (tbr_debug) {
skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Debug: DELETE OR UPDATE to %s.%s",
dbname, tbname);
}
} else { } else {
free(dbname); free(dbname);
free(tbname); free(tbname);
@ -417,6 +431,12 @@ tbr_parser_table_names(
db_name[name_count] = dbname; db_name[name_count] = dbname;
table_name[name_count] = tbname; table_name[name_count] = tbname;
name_count++; name_count++;
if (tbr_debug) {
skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Debug: LOAD to %s.%s",
dbname, tbname);
}
} else { } else {
free(dbname); free(dbname);
free(tbname); free(tbname);

View File

@ -18,8 +18,17 @@ find_path(SkySQL_INCLUDE_DIR skygw_debug.h
/usr/local/include /usr/include ../../utils) /usr/local/include /usr/include ../../utils)
include_directories(${SkySQL_INCLUDE_DIR}) include_directories(${SkySQL_INCLUDE_DIR})
find_path(TRC_INCLUDE_DIR table_replication_consistency.h
../ /usr/include /usr/local/include)
include_directories(${TRC_INCLUDE_DIR})
# Build rule for example # Build rule for example
foreach(prog Example) foreach(prog Example)
ADD_EXECUTABLE(${prog} ${prog}.c) ADD_EXECUTABLE(${prog} ${prog}.c ../../utils/skygw_utils.o )
TARGET_LINK_LIBRARIES(${prog} table_replication_consistency.a replication boost_system pthread stdc++ ${MySQL_LIBRARY} crypt aio) TARGET_LINK_LIBRARIES(${prog} table_replication_consistency.a replication boost_system pthread stdc++ ${MySQL_LIBRARY} crypt aio log_manager)
endforeach()
foreach(prog test)
ADD_EXECUTABLE(${prog} ${prog}.cpp ../../utils/skygw_utils.o )
TARGET_LINK_LIBRARIES(${prog} table_replication_consistency.a replication boost_system pthread stdc++ ${MySQL_LIBRARY} crypt aio log_manager)
endforeach() endforeach()

View File

@ -1,10 +1,13 @@
#include "table_replication_consistency.h"
#include <getopt.h> #include <getopt.h>
#include <stdlib.h> #include <stdlib.h>
#include <errno.h> #include <errno.h>
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <mysql.h> #include <mysql.h>
#ifndef bool
#define bool int
#endif
#include "table_replication_consistency.h"
static char* server_options[] = { static char* server_options[] = {
"jan test", "jan test",
@ -24,13 +27,14 @@ static char* server_groups[] = {
NULL NULL
}; };
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
int i=0,k=0; int i=0,k=0;
char *uri; char *uri;
replication_listener_t *mrl; replication_listener_t *mrl;
int err=0; int err=0;
char *errstr=NULL;
// This will initialize MySQL // This will initialize MySQL
if (mysql_server_init(num_elements, server_options, server_groups)) { if (mysql_server_init(num_elements, server_options, server_groups)) {
@ -62,7 +66,7 @@ int main(int argc, char** argv)
} }
}//end of outer while loop }//end of outer while loop
err = tb_replication_consistency_init(mrl, k, 5); err = tb_replication_consistency_init(mrl, k, 5, TBR_TRACE_DEBUG);
if (err ) { if (err ) {
perror(NULL); perror(NULL);
@ -73,6 +77,13 @@ int main(int argc, char** argv)
sleep(3); sleep(3);
} }
err = tb_replication_consistency_shutdown(&errstr);
if (*errstr) {
fprintf(stderr, "%s\n", errstr);
free(errstr);
}
exit(0); exit(0);
} }

View File

@ -111,6 +111,19 @@ Example/fast:
$(MAKE) -f CMakeFiles/Example.dir/build.make CMakeFiles/Example.dir/build $(MAKE) -f CMakeFiles/Example.dir/build.make CMakeFiles/Example.dir/build
.PHONY : Example/fast .PHONY : Example/fast
#=============================================================================
# Target rules for targets named test
# Build rule for target.
test: cmake_check_build_system
$(MAKE) -f CMakeFiles/Makefile2 test
.PHONY : test
# fast build rule for target.
test/fast:
$(MAKE) -f CMakeFiles/test.dir/build.make CMakeFiles/test.dir/build
.PHONY : test/fast
Example.o: Example.c.o Example.o: Example.c.o
.PHONY : Example.o .PHONY : Example.o
@ -135,6 +148,30 @@ Example.c.s:
$(MAKE) -f CMakeFiles/Example.dir/build.make CMakeFiles/Example.dir/Example.c.s $(MAKE) -f CMakeFiles/Example.dir/build.make CMakeFiles/Example.dir/Example.c.s
.PHONY : Example.c.s .PHONY : Example.c.s
test.o: test.cpp.o
.PHONY : test.o
# target to build an object file
test.cpp.o:
$(MAKE) -f CMakeFiles/test.dir/build.make CMakeFiles/test.dir/test.cpp.o
.PHONY : test.cpp.o
test.i: test.cpp.i
.PHONY : test.i
# target to preprocess a source file
test.cpp.i:
$(MAKE) -f CMakeFiles/test.dir/build.make CMakeFiles/test.dir/test.cpp.i
.PHONY : test.cpp.i
test.s: test.cpp.s
.PHONY : test.s
# target to generate assembly for a file
test.cpp.s:
$(MAKE) -f CMakeFiles/test.dir/build.make CMakeFiles/test.dir/test.cpp.s
.PHONY : test.cpp.s
# Help Target # Help Target
help: help:
@echo "The following are some of the valid targets for this Makefile:" @echo "The following are some of the valid targets for this Makefile:"
@ -144,9 +181,13 @@ help:
@echo "... Example" @echo "... Example"
@echo "... edit_cache" @echo "... edit_cache"
@echo "... rebuild_cache" @echo "... rebuild_cache"
@echo "... test"
@echo "... Example.o" @echo "... Example.o"
@echo "... Example.i" @echo "... Example.i"
@echo "... Example.s" @echo "... Example.s"
@echo "... test.o"
@echo "... test.i"
@echo "... test.s"
.PHONY : help .PHONY : help

View File

@ -12,7 +12,7 @@ IF(NOT DEFINED CMAKE_INSTALL_CONFIG_NAME)
STRING(REGEX REPLACE "^[^A-Za-z0-9_]+" "" STRING(REGEX REPLACE "^[^A-Za-z0-9_]+" ""
CMAKE_INSTALL_CONFIG_NAME "${BUILD_TYPE}") CMAKE_INSTALL_CONFIG_NAME "${BUILD_TYPE}")
ELSE(BUILD_TYPE) ELSE(BUILD_TYPE)
SET(CMAKE_INSTALL_CONFIG_NAME "") SET(CMAKE_INSTALL_CONFIG_NAME "Debug")
ENDIF(BUILD_TYPE) ENDIF(BUILD_TYPE)
MESSAGE(STATUS "Install configuration: \"${CMAKE_INSTALL_CONFIG_NAME}\"") MESSAGE(STATUS "Install configuration: \"${CMAKE_INSTALL_CONFIG_NAME}\"")
ENDIF(NOT DEFINED CMAKE_INSTALL_CONFIG_NAME) ENDIF(NOT DEFINED CMAKE_INSTALL_CONFIG_NAME)