From 33b5d9c62bb0a1c92e912815ad7499e0504993a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Lindstr=C3=B6m?= Date: Fri, 21 Jun 2013 09:13:06 +0300 Subject: [PATCH] Added missing files from table replication consistency --- .../cmake_install.cmake | 80 +++++ .../table_replication_consistency.cpp | 317 ++++++++++++++++++ .../table_replication_consistency.h | 143 ++++++++ .../table_replication_listener.h | 86 +++++ 4 files changed, 626 insertions(+) create mode 100644 table_replication_consistency/cmake_install.cmake create mode 100644 table_replication_consistency/table_replication_consistency.cpp create mode 100644 table_replication_consistency/table_replication_consistency.h create mode 100644 table_replication_consistency/table_replication_listener.h diff --git a/table_replication_consistency/cmake_install.cmake b/table_replication_consistency/cmake_install.cmake new file mode 100644 index 000000000..2d78b5dc6 --- /dev/null +++ b/table_replication_consistency/cmake_install.cmake @@ -0,0 +1,80 @@ +# Install script for directory: /home/jan/skysql/skygateway/table_replication_consistency + +# Set the install prefix +IF(NOT DEFINED CMAKE_INSTALL_PREFIX) + SET(CMAKE_INSTALL_PREFIX "/usr/local") +ENDIF(NOT DEFINED CMAKE_INSTALL_PREFIX) +STRING(REGEX REPLACE "/$" "" CMAKE_INSTALL_PREFIX "${CMAKE_INSTALL_PREFIX}") + +# Set the install configuration name. +IF(NOT DEFINED CMAKE_INSTALL_CONFIG_NAME) + IF(BUILD_TYPE) + STRING(REGEX REPLACE "^[^A-Za-z0-9_]+" "" + CMAKE_INSTALL_CONFIG_NAME "${BUILD_TYPE}") + ELSE(BUILD_TYPE) + SET(CMAKE_INSTALL_CONFIG_NAME "") + ENDIF(BUILD_TYPE) + MESSAGE(STATUS "Install configuration: \"${CMAKE_INSTALL_CONFIG_NAME}\"") +ENDIF(NOT DEFINED CMAKE_INSTALL_CONFIG_NAME) + +# Set the component getting installed. +IF(NOT CMAKE_INSTALL_COMPONENT) + IF(COMPONENT) + MESSAGE(STATUS "Install component: \"${COMPONENT}\"") + SET(CMAKE_INSTALL_COMPONENT "${COMPONENT}") + ELSE(COMPONENT) + SET(CMAKE_INSTALL_COMPONENT) + ENDIF(COMPONENT) +ENDIF(NOT CMAKE_INSTALL_COMPONENT) + +# Install shared libraries without execute permission? +IF(NOT DEFINED CMAKE_INSTALL_SO_NO_EXE) + SET(CMAKE_INSTALL_SO_NO_EXE "1") +ENDIF(NOT DEFINED CMAKE_INSTALL_SO_NO_EXE) + +IF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified") + FOREACH(file + "$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so.0.1" + "$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so.1" + "$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so" + ) + IF(EXISTS "${file}" AND + NOT IS_SYMLINK "${file}") + FILE(RPATH_CHECK + FILE "${file}" + RPATH "") + ENDIF() + ENDFOREACH() + FILE(INSTALL DESTINATION "${CMAKE_INSTALL_PREFIX}/lib" TYPE SHARED_LIBRARY FILES + "/home/jan/skysql/skygateway/table_replication_consistency/libtable_replication_consistency.so.0.1" + "/home/jan/skysql/skygateway/table_replication_consistency/libtable_replication_consistency.so.1" + "/home/jan/skysql/skygateway/table_replication_consistency/libtable_replication_consistency.so" + ) + FOREACH(file + "$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so.0.1" + "$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so.1" + "$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so" + ) + IF(EXISTS "${file}" AND + NOT IS_SYMLINK "${file}") + IF(CMAKE_INSTALL_DO_STRIP) + EXECUTE_PROCESS(COMMAND "/usr/bin/strip" "${file}") + ENDIF(CMAKE_INSTALL_DO_STRIP) + ENDIF() + ENDFOREACH() +ENDIF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified") + +IF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified") + FILE(INSTALL DESTINATION "${CMAKE_INSTALL_PREFIX}/lib" TYPE STATIC_LIBRARY FILES "/home/jan/skysql/skygateway/table_replication_consistency/libtable_replication_consistency.a") +ENDIF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified") + +IF(CMAKE_INSTALL_COMPONENT) + SET(CMAKE_INSTALL_MANIFEST "install_manifest_${CMAKE_INSTALL_COMPONENT}.txt") +ELSE(CMAKE_INSTALL_COMPONENT) + SET(CMAKE_INSTALL_MANIFEST "install_manifest.txt") +ENDIF(CMAKE_INSTALL_COMPONENT) + +FILE(WRITE "/home/jan/skysql/skygateway/table_replication_consistency/${CMAKE_INSTALL_MANIFEST}" "") +FOREACH(file ${CMAKE_INSTALL_MANIFEST_FILES}) + FILE(APPEND "/home/jan/skysql/skygateway/table_replication_consistency/${CMAKE_INSTALL_MANIFEST}" "${file}\n") +ENDFOREACH(file) diff --git a/table_replication_consistency/table_replication_consistency.cpp b/table_replication_consistency/table_replication_consistency.cpp new file mode 100644 index 000000000..cc3dfc51e --- /dev/null +++ b/table_replication_consistency/table_replication_consistency.cpp @@ -0,0 +1,317 @@ +/* +Copyright (C) 2013, SkySQL Ab + + +This file is distributed as part of the SkySQL Gateway. It is free +software: you can redistribute it and/or modify it under the terms of the +GNU General Public License as published by the Free Software Foundation, +version 2. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., 51 +Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +Author: Jan Lindström jan.lindstrom@skysql.com + +Created: 20-06-2013 +Updated: + +*/ +#include +#include "my_pthread.h" +#include +#include +#include +#include +#include +#include +#include +#include "table_replication_consistency.h" +#include "table_replication_listener.h" +#include "listener_exception.h" + + +/* Global memory */ + +pthread_t *replication_listener_tid = NULL; +unsigned int n_replication_listeners = 0; + +/* Namespaces */ + +using namespace std; +using namespace mysql; +using namespace mysql::replication_listener; + +/***********************************************************************//** +This function will register replication listener for every server +provided and initialize all internal data structures and starts listening +the replication stream. +@return 0 on success, error code at failure. */ +int +tb_replication_consistency_init( +/*============================*/ + replication_listener_t *rpl, /*!< in: Server + definition. */ + int n_servers, /*!< in: Number of servers */ + unsigned int gateway_server_id) /*!< in: Gateway slave + server id. */ +{ + boost::uint32_t i; + int err = 0; + string errmsg=""; + + replication_listener_tid = (pthread_t*)malloc(sizeof(pthread_t) * (n_servers + 1)); + + if (replication_listener_tid == NULL) { + errmsg = string("Table_Replication_Consistency: out of memory"); + goto error_handling; + } + + for(i=0;i < n_servers; i++) { + try { + rpl[i].gateway_slave_server_id = gateway_server_id; + rpl[i].listener_id = i; + + err = pthread_create( + &replication_listener_tid[i], + NULL, + &(tb_replication_listener_reader), + (void *) &(rpl[i])); + + if (err) { + errmsg = string(strerror(err)); + goto error_handling; + } + } + catch(ListenerException const& e) + { + errmsg = e.what(); + goto error_handling; + } + catch(boost::system::error_code const& e) + { + errmsg = e.message(); + goto error_handling; + } + // Try and catch all exceptions + catch(std::exception const& e) + { + errmsg = e.what(); + goto error_handling; + } + // Rest of them + catch(...) + { + errmsg = std::string("Unknown exception: "); + goto error_handling; + } + } + + n_replication_listeners = i; + /* We will try to join the threads at shutdown */ + return (0); + + error_handling: + n_replication_listeners = i; +rpl[i].error_message = (char *)malloc(errmsg.size()+1); + strcpy(rpl[i].error_message, errmsg.c_str()); + return (1); +} + +/***********************************************************************//** +With this fuction client can request table consistency status for a +single table. As a return client will receive a number of consistency +status structures. Client must allocate memory for consistency result +array and provide the maximum number of values returned. At return +there is information how many results where available. +@return 0 on success, error code at failure. */ +int +tb_replication_consistency_query( +/*=============================*/ + table_consistency_query_t *tb_query, /*!< in: Table consistency + query. */ + table_consistency_t *tb_consistency, /*!< in: Table consistency + status structure.*/ + int *n_servers) /*!< inout: Number of + servers where to get table + consistency status. Out: Number + of successfull consistency + query results. */ +{ + int err = 0; + boost::uint32_t i = 0; + std::string errmsg =""; + + // We need to protect C client from exceptions here + try { + for(i = 0; i < *n_servers; i++) { + err = tb_replication_listener_consistency(tb_query->db_dot_table, &tb_consistency[i], i); + + if (err) { + goto err_exit; + } + } + } + catch(mysql::ListenerException const& e) + { + errmsg = e.what(); + goto error_handling; + } + catch(boost::system::error_code const& e) + { + errmsg = e.message(); + goto error_handling; + } + // Try and catch all exceptions + catch(std::exception const& e) + { + errmsg = e.what(); + goto error_handling; + } + // Rest of them + catch(...) + { + errmsg = std::string("Unknown exception: "); + goto error_handling; + } + + *n_servers = i; + return (err); + +error_handling: + tb_consistency[i].error_message = (char *)malloc(errmsg.size()+1); + strcpy(tb_consistency[i].error_message, errmsg.c_str()); + +err_exit: + *n_servers=i-1; + tb_consistency[i].error_code = err; + + return (1); +} + +/***********************************************************************//** +This function will reconnect replication listener to a server +provided. +@return 0 on success, error code at failure. */ +int +tb_replication_consistency_reconnect( +/*=================================*/ + replication_listener_t* rpl, /*!< in: Server definition.*/ + unsigned int gateway_server_id) /*!< in: Gateway slave + server id. */ +{ + std::string errmsg =""; + int err = 0; + + rpl->gateway_slave_server_id = gateway_server_id; + + // We need to protect C client from exceptions here + try { + err = tb_replication_listener_reconnect(rpl, &replication_listener_tid[rpl->listener_id]); + + if (err) { + goto err_exit; + } + } + catch(ListenerException const& e) + { + errmsg = e.what(); + goto error_handling; + } + catch(boost::system::error_code const& e) + { + errmsg = e.message(); + goto error_handling; + } + // Try and catch all exceptions + catch(std::exception const& e) + { + errmsg = e.what(); + goto error_handling; + } + // Rest of them + catch(...) + { + errmsg = std::string("Unknown exception: "); + goto error_handling; + } + + return (err); + +error_handling: + rpl->error_message = (char *)malloc(errmsg.size()+1); + strcpy(rpl->error_message, errmsg.c_str()); + +err_exit: + return (1); +} + +/***********************************************************************//** +This function is to shutdown the replication listener and free all +resources on table consistency. This function (TODO) will store +the current status on metadata to MySQL server. +@return 0 on success, error code at failure. */ +int +tb_replication_consistency_shutdown( + char ** error_message) /*!< out: error message */ +/*================================*/ +{ + int err = 0; + boost::uint32_t i = 0; + std::string errmsg =""; + + // We need to protect C client from exceptions here + try { + for(i = 0; i < n_replication_listeners; i++) { + err = tb_replication_listener_shutdown(i, error_message); + + if (err) { + goto err_exit; + } + } + + // Need to wait until the thread exits + err = pthread_join(replication_listener_tid[i], (void **)error_message); + + if (err) { + goto err_exit; + } + } + catch(mysql::ListenerException const& e) + { + errmsg = e.what(); + goto error_handling; + } + catch(boost::system::error_code const& e) + { + errmsg = e.message(); + goto error_handling; + } + // Try and catch all exceptions + catch(std::exception const& e) + { + errmsg = e.what(); + goto error_handling; + } + // Rest of them + catch(...) + { + errmsg = std::string("Unknown exception: "); + goto error_handling; + } + + return (err); + +error_handling: + *error_message = (char *)malloc(errmsg.size()+1); + strcpy(*error_message, errmsg.c_str()); + +err_exit: + return (1); + +} diff --git a/table_replication_consistency/table_replication_consistency.h b/table_replication_consistency/table_replication_consistency.h new file mode 100644 index 000000000..a640cafad --- /dev/null +++ b/table_replication_consistency/table_replication_consistency.h @@ -0,0 +1,143 @@ +/* +Copyright (C) 2013, SkySQL Ab + + +This file is distributed as part of the SkySQL Gateway. It is free +software: you can redistribute it and/or modify it under the terms of the +GNU General Public License as published by the Free Software Foundation, +version 2. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., 51 +Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +Author: Jan Lindström jan.lindstrom@skysql.com + +Created: 20-06-2013 +Updated: + +*/ + +#ifndef TABLE_REPLICATION_CONSISTENCY_H +#define TABLE_REPLICATION_CONSISTENCY_H + +/* Structure definition for replication listener */ +typedef struct replication_listener { + char *server_url; /*!< in: Server address e.g. + mysql://root:pw@127.0.0.1:3308 */ + unsigned long binlog_pos; /*!< in: Binlog position where to start + listening. */ + int use_mariadb_gtid; /*!< in: 1 if MariaDB global + transaction id should be used for + binlog start position. */ + int use_mysql_gtid; /*!< in: 1 if MySQL global + transaction id should be used for + binlog start position. */ + int use_binlog_pos; /*!< in: 1 if binlog position + should be used for binlog start + position. */ + char *gtid; /*!< in: Global transaction identifier + or NULL */ + int is_master; /*!< in: Is this server a master 1 = + yes, 0 = no. */ + int gateway_slave_server_id; /*!< in: replication listener slave + server id. */ + int listener_id; /*!< in: listener id */ + int connection_suggesfull; /*!< out: 0 if connection successfull + or error number. */ + char *error_message; /*!< out: error message in case of + error. */ +} replication_listener_t; + +/* Structure definition for table consistency query */ +typedef struct table_consistency_query { + char *db_dot_table; /*!< in: Fully qualified database and + table, e.g. Production.Orders. */ +} table_consistency_query_t; + +/* Structure definition for table consistency result */ +typedef struct table_consistency { + char *db_dot_table; /*!< out: Fully qualified database and + table, e.g. Production.Orders. */ + unsigned int server_id; /*!< out: Server id where the consitency + information is from. */ + int mariadb_gtid_known; /*!< out: 1 if MariaDB global + transaction id is known. */ + int mysql_gtid_known; /*!< out: 1 if MySQL global + transaction id is known. */ + unsigned long binlog_pos; /*!< out: Last seen binlog position + on this server. */ + char *gtid; /*!< out: If global transacition id + is known, will contain the id or NULL. */ + int error_code; /*!< out: 0 if table consistency query + for this server succesfull or error + code. */ + char *error_message; /*!< out: Error message if table + consistency query failed for this + server failed. */ +} table_consistency_t; + +/* Interface functions */ + +/***********************************************************************//** +This function will register replication listener for every server +provided and initialize all internal data structures and starts listening +the replication stream. +@return 0 on success, error code at failure. */ +int +tb_replication_consistency_init( +/*============================*/ + replication_listener_t *rpl, /*!< in: Server + definition. */ + int n_servers, /*!< in: Number of servers */ + unsigned int gateway_server_id);/*!< in: Gateway slave + server id. */ + +/***********************************************************************//** +With this fuction client can request table consistency status for a +single table. As a return client will receive a number of consistency +status structures. Client must allocate memory for consistency result +array and provide the maximum number of values returned. At return +there is information how many results where available. +@return 0 on success, error code at failure. */ +int +tb_replication_consistency_query( +/*=============================*/ + table_consistency_query_t *tb_query, /*!< in: Table consistency + query. */ + table_consistency_t *tb_consistency, /*!< in: Table consistency + status structure.*/ + int *n_servers); /*!< inout: Number of + servers where to get table + consistency status. Out: Number + of successfull consistency + query results. */ + +/***********************************************************************//** +This function will reconnect replication listener to a server +provided. +@return 0 on success, error code at failure. */ +int +tb_replication_consistency_reconnect( +/*=================================*/ + replication_listener_t* rpl, /*!< in: Server definition.*/ + unsigned int gateway_server_id); /*!< in: Gateway slave + server id. */ + +/***********************************************************************//** +This function is to shutdown the replication listener and free all +resources on table consistency. This function (TODO) will store +the current status on metadata to MySQL server. +@return 0 on success, error code at failure. */ +int +tb_replication_consistency_shutdown( +/*=================================*/ + char ** error_message); /*!< out: error_message*/ + + +#endif diff --git a/table_replication_consistency/table_replication_listener.h b/table_replication_consistency/table_replication_listener.h new file mode 100644 index 000000000..712ffb42b --- /dev/null +++ b/table_replication_consistency/table_replication_listener.h @@ -0,0 +1,86 @@ +/* +Copyright (C) 2013, SkySQL Ab + + +This file is distributed as part of the SkySQL Gateway. It is free +software: you can redistribute it and/or modify it under the terms of the +GNU General Public License as published by the Free Software Foundation, +version 2. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., 51 +Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +Author: Jan Lindström jan.lindstrom@skysql.com + +Created: 20-06-2013 +Updated: + +*/ + +#ifndef TABLE_REPLICATION_LISTENER_H +#define TABLE_REPLICATION_LISTENER_H + +#include + +namespace mysql +{ +namespace replication_listener +{ + +/***********************************************************************//** +This is the function that is executed by replication listeners. +At startup it will try to connect the server and start listening +the actual replication stream. +@return Pointer to error message. */ +void *tb_replication_listener_reader( +/*=================================*/ + void *arg); /*!< in: Replication listener + definition. */ + +/***********************************************************************//** +With this fuction client can request table consistency status for a +single table. As a return client will receive a number of consistency +status structures. Client must allocate memory for consistency result +array and provide the maximum number of values returned. At return +there is information how many results where available. +@return 0 on success, error code at failure. */ +int +tb_replication_listener_consistency( +/*================================*/ + char *db_dot_table, /*!< in: Fully qualified table + name. */ + table_consistency_t *tb_consistency, /*!< out: Consistency values. */ + boost::uint32_t server_no); /*!< in: Server */ + +/***********************************************************************//** +This function will reconnect replication listener to a server +provided. +@return 0 on success, error code at failure. */ +int +tb_replication_listener_reconnect( +/*==============================*/ + replication_listener_t* rpl, /*!< in: Server definition.*/ + pthread_t* tid); /*!< in: Thread id */ + +/***********************************************************************//** +This function is to shutdown the replication listener and free all +resources on table consistency. This function (TODO) will store +the current status on metadata to MySQL server. +@return 0 on success, error code at failure. */ +int +tb_replication_listener_shutdown( +/*=================================*/ + boost::uint32_t server_id, /*!< in: server id */ + char **error_message); /*!< out: error message */ + +} // namespace replication_listener + +} // namespace mysql + +#endif