Added missing files from table replication consistency

This commit is contained in:
Jan Lindström 2013-06-21 09:13:06 +03:00
parent bc3a104e3f
commit 33b5d9c62b
4 changed files with 626 additions and 0 deletions

View File

@ -0,0 +1,80 @@
# Install script for directory: /home/jan/skysql/skygateway/table_replication_consistency
# Set the install prefix
IF(NOT DEFINED CMAKE_INSTALL_PREFIX)
SET(CMAKE_INSTALL_PREFIX "/usr/local")
ENDIF(NOT DEFINED CMAKE_INSTALL_PREFIX)
STRING(REGEX REPLACE "/$" "" CMAKE_INSTALL_PREFIX "${CMAKE_INSTALL_PREFIX}")
# Set the install configuration name.
IF(NOT DEFINED CMAKE_INSTALL_CONFIG_NAME)
IF(BUILD_TYPE)
STRING(REGEX REPLACE "^[^A-Za-z0-9_]+" ""
CMAKE_INSTALL_CONFIG_NAME "${BUILD_TYPE}")
ELSE(BUILD_TYPE)
SET(CMAKE_INSTALL_CONFIG_NAME "")
ENDIF(BUILD_TYPE)
MESSAGE(STATUS "Install configuration: \"${CMAKE_INSTALL_CONFIG_NAME}\"")
ENDIF(NOT DEFINED CMAKE_INSTALL_CONFIG_NAME)
# Set the component getting installed.
IF(NOT CMAKE_INSTALL_COMPONENT)
IF(COMPONENT)
MESSAGE(STATUS "Install component: \"${COMPONENT}\"")
SET(CMAKE_INSTALL_COMPONENT "${COMPONENT}")
ELSE(COMPONENT)
SET(CMAKE_INSTALL_COMPONENT)
ENDIF(COMPONENT)
ENDIF(NOT CMAKE_INSTALL_COMPONENT)
# Install shared libraries without execute permission?
IF(NOT DEFINED CMAKE_INSTALL_SO_NO_EXE)
SET(CMAKE_INSTALL_SO_NO_EXE "1")
ENDIF(NOT DEFINED CMAKE_INSTALL_SO_NO_EXE)
IF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified")
FOREACH(file
"$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so.0.1"
"$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so.1"
"$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so"
)
IF(EXISTS "${file}" AND
NOT IS_SYMLINK "${file}")
FILE(RPATH_CHECK
FILE "${file}"
RPATH "")
ENDIF()
ENDFOREACH()
FILE(INSTALL DESTINATION "${CMAKE_INSTALL_PREFIX}/lib" TYPE SHARED_LIBRARY FILES
"/home/jan/skysql/skygateway/table_replication_consistency/libtable_replication_consistency.so.0.1"
"/home/jan/skysql/skygateway/table_replication_consistency/libtable_replication_consistency.so.1"
"/home/jan/skysql/skygateway/table_replication_consistency/libtable_replication_consistency.so"
)
FOREACH(file
"$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so.0.1"
"$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so.1"
"$ENV{DESTDIR}${CMAKE_INSTALL_PREFIX}/lib/libtable_replication_consistency.so"
)
IF(EXISTS "${file}" AND
NOT IS_SYMLINK "${file}")
IF(CMAKE_INSTALL_DO_STRIP)
EXECUTE_PROCESS(COMMAND "/usr/bin/strip" "${file}")
ENDIF(CMAKE_INSTALL_DO_STRIP)
ENDIF()
ENDFOREACH()
ENDIF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified")
IF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified")
FILE(INSTALL DESTINATION "${CMAKE_INSTALL_PREFIX}/lib" TYPE STATIC_LIBRARY FILES "/home/jan/skysql/skygateway/table_replication_consistency/libtable_replication_consistency.a")
ENDIF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified")
IF(CMAKE_INSTALL_COMPONENT)
SET(CMAKE_INSTALL_MANIFEST "install_manifest_${CMAKE_INSTALL_COMPONENT}.txt")
ELSE(CMAKE_INSTALL_COMPONENT)
SET(CMAKE_INSTALL_MANIFEST "install_manifest.txt")
ENDIF(CMAKE_INSTALL_COMPONENT)
FILE(WRITE "/home/jan/skysql/skygateway/table_replication_consistency/${CMAKE_INSTALL_MANIFEST}" "")
FOREACH(file ${CMAKE_INSTALL_MANIFEST_FILES})
FILE(APPEND "/home/jan/skysql/skygateway/table_replication_consistency/${CMAKE_INSTALL_MANIFEST}" "${file}\n")
ENDFOREACH(file)

View File

@ -0,0 +1,317 @@
/*
Copyright (C) 2013, SkySQL Ab
This file is distributed as part of the SkySQL Gateway. It is free
software: you can redistribute it and/or modify it under the terms of the
GNU General Public License as published by the Free Software Foundation,
version 2.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc., 51
Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
Author: Jan Lindström jan.lindstrom@skysql.com
Created: 20-06-2013
Updated:
*/
#include <iostream>
#include "my_pthread.h"
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <regex.h>
#include <algorithm>
#include <sstream>
#include <boost/system/system_error.hpp>
#include "table_replication_consistency.h"
#include "table_replication_listener.h"
#include "listener_exception.h"
/* Global memory */
pthread_t *replication_listener_tid = NULL;
unsigned int n_replication_listeners = 0;
/* Namespaces */
using namespace std;
using namespace mysql;
using namespace mysql::replication_listener;
/***********************************************************************//**
This function will register replication listener for every server
provided and initialize all internal data structures and starts listening
the replication stream.
@return 0 on success, error code at failure. */
int
tb_replication_consistency_init(
/*============================*/
replication_listener_t *rpl, /*!< in: Server
definition. */
int n_servers, /*!< in: Number of servers */
unsigned int gateway_server_id) /*!< in: Gateway slave
server id. */
{
boost::uint32_t i;
int err = 0;
string errmsg="";
replication_listener_tid = (pthread_t*)malloc(sizeof(pthread_t) * (n_servers + 1));
if (replication_listener_tid == NULL) {
errmsg = string("Table_Replication_Consistency: out of memory");
goto error_handling;
}
for(i=0;i < n_servers; i++) {
try {
rpl[i].gateway_slave_server_id = gateway_server_id;
rpl[i].listener_id = i;
err = pthread_create(
&replication_listener_tid[i],
NULL,
&(tb_replication_listener_reader),
(void *) &(rpl[i]));
if (err) {
errmsg = string(strerror(err));
goto error_handling;
}
}
catch(ListenerException const& e)
{
errmsg = e.what();
goto error_handling;
}
catch(boost::system::error_code const& e)
{
errmsg = e.message();
goto error_handling;
}
// Try and catch all exceptions
catch(std::exception const& e)
{
errmsg = e.what();
goto error_handling;
}
// Rest of them
catch(...)
{
errmsg = std::string("Unknown exception: ");
goto error_handling;
}
}
n_replication_listeners = i;
/* We will try to join the threads at shutdown */
return (0);
error_handling:
n_replication_listeners = i;
rpl[i].error_message = (char *)malloc(errmsg.size()+1);
strcpy(rpl[i].error_message, errmsg.c_str());
return (1);
}
/***********************************************************************//**
With this fuction client can request table consistency status for a
single table. As a return client will receive a number of consistency
status structures. Client must allocate memory for consistency result
array and provide the maximum number of values returned. At return
there is information how many results where available.
@return 0 on success, error code at failure. */
int
tb_replication_consistency_query(
/*=============================*/
table_consistency_query_t *tb_query, /*!< in: Table consistency
query. */
table_consistency_t *tb_consistency, /*!< in: Table consistency
status structure.*/
int *n_servers) /*!< inout: Number of
servers where to get table
consistency status. Out: Number
of successfull consistency
query results. */
{
int err = 0;
boost::uint32_t i = 0;
std::string errmsg ="";
// We need to protect C client from exceptions here
try {
for(i = 0; i < *n_servers; i++) {
err = tb_replication_listener_consistency(tb_query->db_dot_table, &tb_consistency[i], i);
if (err) {
goto err_exit;
}
}
}
catch(mysql::ListenerException const& e)
{
errmsg = e.what();
goto error_handling;
}
catch(boost::system::error_code const& e)
{
errmsg = e.message();
goto error_handling;
}
// Try and catch all exceptions
catch(std::exception const& e)
{
errmsg = e.what();
goto error_handling;
}
// Rest of them
catch(...)
{
errmsg = std::string("Unknown exception: ");
goto error_handling;
}
*n_servers = i;
return (err);
error_handling:
tb_consistency[i].error_message = (char *)malloc(errmsg.size()+1);
strcpy(tb_consistency[i].error_message, errmsg.c_str());
err_exit:
*n_servers=i-1;
tb_consistency[i].error_code = err;
return (1);
}
/***********************************************************************//**
This function will reconnect replication listener to a server
provided.
@return 0 on success, error code at failure. */
int
tb_replication_consistency_reconnect(
/*=================================*/
replication_listener_t* rpl, /*!< in: Server definition.*/
unsigned int gateway_server_id) /*!< in: Gateway slave
server id. */
{
std::string errmsg ="";
int err = 0;
rpl->gateway_slave_server_id = gateway_server_id;
// We need to protect C client from exceptions here
try {
err = tb_replication_listener_reconnect(rpl, &replication_listener_tid[rpl->listener_id]);
if (err) {
goto err_exit;
}
}
catch(ListenerException const& e)
{
errmsg = e.what();
goto error_handling;
}
catch(boost::system::error_code const& e)
{
errmsg = e.message();
goto error_handling;
}
// Try and catch all exceptions
catch(std::exception const& e)
{
errmsg = e.what();
goto error_handling;
}
// Rest of them
catch(...)
{
errmsg = std::string("Unknown exception: ");
goto error_handling;
}
return (err);
error_handling:
rpl->error_message = (char *)malloc(errmsg.size()+1);
strcpy(rpl->error_message, errmsg.c_str());
err_exit:
return (1);
}
/***********************************************************************//**
This function is to shutdown the replication listener and free all
resources on table consistency. This function (TODO) will store
the current status on metadata to MySQL server.
@return 0 on success, error code at failure. */
int
tb_replication_consistency_shutdown(
char ** error_message) /*!< out: error message */
/*================================*/
{
int err = 0;
boost::uint32_t i = 0;
std::string errmsg ="";
// We need to protect C client from exceptions here
try {
for(i = 0; i < n_replication_listeners; i++) {
err = tb_replication_listener_shutdown(i, error_message);
if (err) {
goto err_exit;
}
}
// Need to wait until the thread exits
err = pthread_join(replication_listener_tid[i], (void **)error_message);
if (err) {
goto err_exit;
}
}
catch(mysql::ListenerException const& e)
{
errmsg = e.what();
goto error_handling;
}
catch(boost::system::error_code const& e)
{
errmsg = e.message();
goto error_handling;
}
// Try and catch all exceptions
catch(std::exception const& e)
{
errmsg = e.what();
goto error_handling;
}
// Rest of them
catch(...)
{
errmsg = std::string("Unknown exception: ");
goto error_handling;
}
return (err);
error_handling:
*error_message = (char *)malloc(errmsg.size()+1);
strcpy(*error_message, errmsg.c_str());
err_exit:
return (1);
}

View File

@ -0,0 +1,143 @@
/*
Copyright (C) 2013, SkySQL Ab
This file is distributed as part of the SkySQL Gateway. It is free
software: you can redistribute it and/or modify it under the terms of the
GNU General Public License as published by the Free Software Foundation,
version 2.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc., 51
Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
Author: Jan Lindström jan.lindstrom@skysql.com
Created: 20-06-2013
Updated:
*/
#ifndef TABLE_REPLICATION_CONSISTENCY_H
#define TABLE_REPLICATION_CONSISTENCY_H
/* Structure definition for replication listener */
typedef struct replication_listener {
char *server_url; /*!< in: Server address e.g.
mysql://root:pw@127.0.0.1:3308 */
unsigned long binlog_pos; /*!< in: Binlog position where to start
listening. */
int use_mariadb_gtid; /*!< in: 1 if MariaDB global
transaction id should be used for
binlog start position. */
int use_mysql_gtid; /*!< in: 1 if MySQL global
transaction id should be used for
binlog start position. */
int use_binlog_pos; /*!< in: 1 if binlog position
should be used for binlog start
position. */
char *gtid; /*!< in: Global transaction identifier
or NULL */
int is_master; /*!< in: Is this server a master 1 =
yes, 0 = no. */
int gateway_slave_server_id; /*!< in: replication listener slave
server id. */
int listener_id; /*!< in: listener id */
int connection_suggesfull; /*!< out: 0 if connection successfull
or error number. */
char *error_message; /*!< out: error message in case of
error. */
} replication_listener_t;
/* Structure definition for table consistency query */
typedef struct table_consistency_query {
char *db_dot_table; /*!< in: Fully qualified database and
table, e.g. Production.Orders. */
} table_consistency_query_t;
/* Structure definition for table consistency result */
typedef struct table_consistency {
char *db_dot_table; /*!< out: Fully qualified database and
table, e.g. Production.Orders. */
unsigned int server_id; /*!< out: Server id where the consitency
information is from. */
int mariadb_gtid_known; /*!< out: 1 if MariaDB global
transaction id is known. */
int mysql_gtid_known; /*!< out: 1 if MySQL global
transaction id is known. */
unsigned long binlog_pos; /*!< out: Last seen binlog position
on this server. */
char *gtid; /*!< out: If global transacition id
is known, will contain the id or NULL. */
int error_code; /*!< out: 0 if table consistency query
for this server succesfull or error
code. */
char *error_message; /*!< out: Error message if table
consistency query failed for this
server failed. */
} table_consistency_t;
/* Interface functions */
/***********************************************************************//**
This function will register replication listener for every server
provided and initialize all internal data structures and starts listening
the replication stream.
@return 0 on success, error code at failure. */
int
tb_replication_consistency_init(
/*============================*/
replication_listener_t *rpl, /*!< in: Server
definition. */
int n_servers, /*!< in: Number of servers */
unsigned int gateway_server_id);/*!< in: Gateway slave
server id. */
/***********************************************************************//**
With this fuction client can request table consistency status for a
single table. As a return client will receive a number of consistency
status structures. Client must allocate memory for consistency result
array and provide the maximum number of values returned. At return
there is information how many results where available.
@return 0 on success, error code at failure. */
int
tb_replication_consistency_query(
/*=============================*/
table_consistency_query_t *tb_query, /*!< in: Table consistency
query. */
table_consistency_t *tb_consistency, /*!< in: Table consistency
status structure.*/
int *n_servers); /*!< inout: Number of
servers where to get table
consistency status. Out: Number
of successfull consistency
query results. */
/***********************************************************************//**
This function will reconnect replication listener to a server
provided.
@return 0 on success, error code at failure. */
int
tb_replication_consistency_reconnect(
/*=================================*/
replication_listener_t* rpl, /*!< in: Server definition.*/
unsigned int gateway_server_id); /*!< in: Gateway slave
server id. */
/***********************************************************************//**
This function is to shutdown the replication listener and free all
resources on table consistency. This function (TODO) will store
the current status on metadata to MySQL server.
@return 0 on success, error code at failure. */
int
tb_replication_consistency_shutdown(
/*=================================*/
char ** error_message); /*!< out: error_message*/
#endif

View File

@ -0,0 +1,86 @@
/*
Copyright (C) 2013, SkySQL Ab
This file is distributed as part of the SkySQL Gateway. It is free
software: you can redistribute it and/or modify it under the terms of the
GNU General Public License as published by the Free Software Foundation,
version 2.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc., 51
Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
Author: Jan Lindström jan.lindstrom@skysql.com
Created: 20-06-2013
Updated:
*/
#ifndef TABLE_REPLICATION_LISTENER_H
#define TABLE_REPLICATION_LISTENER_H
#include <boost/cstdint.hpp>
namespace mysql
{
namespace replication_listener
{
/***********************************************************************//**
This is the function that is executed by replication listeners.
At startup it will try to connect the server and start listening
the actual replication stream.
@return Pointer to error message. */
void *tb_replication_listener_reader(
/*=================================*/
void *arg); /*!< in: Replication listener
definition. */
/***********************************************************************//**
With this fuction client can request table consistency status for a
single table. As a return client will receive a number of consistency
status structures. Client must allocate memory for consistency result
array and provide the maximum number of values returned. At return
there is information how many results where available.
@return 0 on success, error code at failure. */
int
tb_replication_listener_consistency(
/*================================*/
char *db_dot_table, /*!< in: Fully qualified table
name. */
table_consistency_t *tb_consistency, /*!< out: Consistency values. */
boost::uint32_t server_no); /*!< in: Server */
/***********************************************************************//**
This function will reconnect replication listener to a server
provided.
@return 0 on success, error code at failure. */
int
tb_replication_listener_reconnect(
/*==============================*/
replication_listener_t* rpl, /*!< in: Server definition.*/
pthread_t* tid); /*!< in: Thread id */
/***********************************************************************//**
This function is to shutdown the replication listener and free all
resources on table consistency. This function (TODO) will store
the current status on metadata to MySQL server.
@return 0 on success, error code at failure. */
int
tb_replication_listener_shutdown(
/*=================================*/
boost::uint32_t server_id, /*!< in: server id */
char **error_message); /*!< out: error message */
} // namespace replication_listener
} // namespace mysql
#endif