Remove directory table_replication_consistency
Old stuff, not maintained, and not used by anything or anybody.
This commit is contained in:
parent
edef0a3733
commit
2486baffea
@ -1,61 +0,0 @@
|
||||
project (skysql_m_table_replication_concistency)
|
||||
cmake_minimum_required (VERSION 2.6)
|
||||
|
||||
# This configuration file builds both the static and shared version of
|
||||
# the library.
|
||||
|
||||
set(table_replication_consistency_sources table_replication_consistency.cpp table_replication_listener.cpp table_replication_parser.cpp table_replication_metadata.cpp)
|
||||
|
||||
# ---------- Find Boost Headers/Libraries -----------------------
|
||||
SET(Boost_DEBUG FALSE)
|
||||
SET(Boost_FIND_REQUIRED TRUE)
|
||||
SET(Boost_FIND_QUIETLY TRUE)
|
||||
SET(Boost_USE_STATIC_LIBS FALSE)
|
||||
SET(Boost_ADDITIONAL_VERSIONS "1.41" "1.41.0")
|
||||
FIND_PACKAGE(Boost REQUIRED system thread)
|
||||
|
||||
# --------- Find crypt
|
||||
FIND_LIBRARY(LIB_CRYPTO crypto /opt/local/lib /opt/lib /usr/lib /usr/local/lib /usr/local/ssl/lib)
|
||||
LINK_DIRECTORIES(${Boost_LIBRARY_DIRS})
|
||||
INCLUDE_DIRECTORIES(${Boost_INCLUDE_DIR})
|
||||
|
||||
# Find MySQL client library and header files
|
||||
find_library(MySQL_LIBRARY NAMES libmysqld.a PATHS
|
||||
/usr/lib64/mysql /usr/lib/mysql /usr/local/mysql/lib)
|
||||
find_path(MySQL_INCLUDE_DIR mysql.h
|
||||
/usr/local/include/mysql /usr/include/mysql /usr/local/mysql/include)
|
||||
include_directories(${MySQL_INCLUDE_DIR})
|
||||
|
||||
#MariaDB Corporation
|
||||
find_path(MariaDB_Corporation_INCLUDE_DIR skygw_debug.h
|
||||
/usr/local/include /usr/include ../utils)
|
||||
include_directories(${MariaDB_Corporation_INCLUDE_DIR})
|
||||
include_directories(../replication_listener)
|
||||
|
||||
#log_manager
|
||||
FIND_LIBRARY(LIB_LOGMANAGER log_manager /lib /opt/local/lib /opt/lib /usr/lib /usr/local/lib ../log_manager)
|
||||
find_path(LogManager_INCLUDE_DIR log_manager.h
|
||||
/usr/local/include /usr/include ../log_manager)
|
||||
include_directories(${LogManager_INCLUDE_DIR})
|
||||
|
||||
#debug settings
|
||||
SET(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DTBR_DEBUG")
|
||||
SET(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -DTBR_DEBUG")
|
||||
|
||||
# Configure for building static library
|
||||
add_library(table_replication_consistency_static STATIC ${table_replication_consistency_sources})
|
||||
target_link_libraries(table_replication_consistency_static crypto log_manager ${Boost_LIBRARIES} ${MySQL_LIBRARY})
|
||||
set_target_properties(table_replication_consistency_static PROPERTIES
|
||||
OUTPUT_NAME "table_replication_consistency")
|
||||
|
||||
# Configure for building shared library
|
||||
add_library(table_replication_consistency_shared SHARED ${table_replication_consistency_sources})
|
||||
target_link_libraries(table_replication_consistency_shared crypto log_manager ${Boost_LIBRARIES} ${MySQL_LIBRARY})
|
||||
|
||||
set_target_properties(table_replication_consistency_shared PROPERTIES
|
||||
VERSION 0.1 SOVERSION 1
|
||||
OUTPUT_NAME "table_replication_consistency")
|
||||
|
||||
install(TARGETS table_replication_consistency_shared LIBRARY DESTINATION lib)
|
||||
install(TARGETS table_replication_consistency_static ARCHIVE DESTINATION lib)
|
||||
|
@ -1,56 +0,0 @@
|
||||
# Install script for directory: /home/jan/skysql/maxscale/table_replication_consistency
|
||||
|
||||
# Set the install prefix
|
||||
IF(NOT DEFINED CMAKE_INSTALL_PREFIX)
|
||||
SET(CMAKE_INSTALL_PREFIX "/usr/local")
|
||||
ENDIF(NOT DEFINED CMAKE_INSTALL_PREFIX)
|
||||
STRING(REGEX REPLACE "/$" "" CMAKE_INSTALL_PREFIX "${CMAKE_INSTALL_PREFIX}")
|
||||
|
||||
# Set the install configuration name.
|
||||
IF(NOT DEFINED CMAKE_INSTALL_CONFIG_NAME)
|
||||
IF(BUILD_TYPE)
|
||||
STRING(REGEX REPLACE "^[^A-Za-z0-9_]+" ""
|
||||
CMAKE_INSTALL_CONFIG_NAME "${BUILD_TYPE}")
|
||||
ELSE(BUILD_TYPE)
|
||||
SET(CMAKE_INSTALL_CONFIG_NAME "")
|
||||
ENDIF(BUILD_TYPE)
|
||||
MESSAGE(STATUS "Install configuration: \"${CMAKE_INSTALL_CONFIG_NAME}\"")
|
||||
ENDIF(NOT DEFINED CMAKE_INSTALL_CONFIG_NAME)
|
||||
|
||||
# Set the component getting installed.
|
||||
IF(NOT CMAKE_INSTALL_COMPONENT)
|
||||
IF(COMPONENT)
|
||||
MESSAGE(STATUS "Install component: \"${COMPONENT}\"")
|
||||
SET(CMAKE_INSTALL_COMPONENT "${COMPONENT}")
|
||||
ELSE(COMPONENT)
|
||||
SET(CMAKE_INSTALL_COMPONENT)
|
||||
ENDIF(COMPONENT)
|
||||
ENDIF(NOT CMAKE_INSTALL_COMPONENT)
|
||||
|
||||
# Install shared libraries without execute permission?
|
||||
IF(NOT DEFINED CMAKE_INSTALL_SO_NO_EXE)
|
||||
SET(CMAKE_INSTALL_SO_NO_EXE "1")
|
||||
ENDIF(NOT DEFINED CMAKE_INSTALL_SO_NO_EXE)
|
||||
|
||||
IF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified")
|
||||
FILE(INSTALL DESTINATION "${CMAKE_INSTALL_PREFIX}/lib" TYPE SHARED_LIBRARY FILES
|
||||
"/home/jan/skysql/maxscale/table_replication_consistency/CMakeFiles/CMakeRelink.dir/libtable_replication_consistency.so.0.1"
|
||||
"/home/jan/skysql/maxscale/table_replication_consistency/CMakeFiles/CMakeRelink.dir/libtable_replication_consistency.so.1"
|
||||
"/home/jan/skysql/maxscale/table_replication_consistency/CMakeFiles/CMakeRelink.dir/libtable_replication_consistency.so"
|
||||
)
|
||||
ENDIF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified")
|
||||
|
||||
IF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified")
|
||||
FILE(INSTALL DESTINATION "${CMAKE_INSTALL_PREFIX}/lib" TYPE STATIC_LIBRARY FILES "/home/jan/skysql/maxscale/table_replication_consistency/libtable_replication_consistency.a")
|
||||
ENDIF(NOT CMAKE_INSTALL_COMPONENT OR "${CMAKE_INSTALL_COMPONENT}" STREQUAL "Unspecified")
|
||||
|
||||
IF(CMAKE_INSTALL_COMPONENT)
|
||||
SET(CMAKE_INSTALL_MANIFEST "install_manifest_${CMAKE_INSTALL_COMPONENT}.txt")
|
||||
ELSE(CMAKE_INSTALL_COMPONENT)
|
||||
SET(CMAKE_INSTALL_MANIFEST "install_manifest.txt")
|
||||
ENDIF(CMAKE_INSTALL_COMPONENT)
|
||||
|
||||
FILE(WRITE "/home/jan/skysql/maxscale/table_replication_consistency/${CMAKE_INSTALL_MANIFEST}" "")
|
||||
FOREACH(file ${CMAKE_INSTALL_MANIFEST_FILES})
|
||||
FILE(APPEND "/home/jan/skysql/maxscale/table_replication_consistency/${CMAKE_INSTALL_MANIFEST}" "${file}\n")
|
||||
ENDFOREACH(file)
|
@ -1,426 +0,0 @@
|
||||
/*
|
||||
Copyright (C) 2013, MariaDB Corporation Ab
|
||||
|
||||
|
||||
This file is distributed as part of the MariaDB Corporation MaxScale. It is free
|
||||
software: you can redistribute it and/or modify it under the terms of the
|
||||
GNU General Public License as published by the Free Software Foundation,
|
||||
version 2.
|
||||
|
||||
This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
details.
|
||||
|
||||
You should have received a copy of the GNU General Public License along with
|
||||
this program; if not, write to the Free Software Foundation, Inc., 51
|
||||
Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
|
||||
Author: Jan Lindström jan.lindstrom@mariadb.com
|
||||
|
||||
Created: 20-06-2013
|
||||
Updated:
|
||||
|
||||
*/
|
||||
#include <iostream>
|
||||
#include <stdlib.h>
|
||||
#include <errno.h>
|
||||
#include <string.h>
|
||||
#include <regex.h>
|
||||
#include <algorithm>
|
||||
#include <sstream>
|
||||
#include <boost/system/system_error.hpp>
|
||||
#include "table_replication_consistency.h"
|
||||
#include "table_replication_listener.h"
|
||||
#include "table_replication_metadata.h"
|
||||
#include "listener_exception.h"
|
||||
#include "log_manager.h"
|
||||
|
||||
/* Global memory */
|
||||
|
||||
pthread_t *replication_listener_tid = NULL;
|
||||
pthread_t *replication_listener_metadata_tid=NULL;
|
||||
unsigned int n_replication_listeners = 0;
|
||||
|
||||
bool listener_shutdown=false; /* This flag will be true
|
||||
at shutdown */
|
||||
|
||||
|
||||
#ifdef TBR_DEBUG
|
||||
bool tbr_trace = true;
|
||||
bool tbr_debug = true;
|
||||
#else
|
||||
#ifdef TBR_TRACE
|
||||
bool tbr_trace = true;
|
||||
#else
|
||||
bool tbr_trace = false;
|
||||
bool tbr_debug = false;
|
||||
#endif
|
||||
#endif
|
||||
|
||||
/* Namespaces */
|
||||
|
||||
using namespace std;
|
||||
using namespace mysql;
|
||||
using namespace table_replication_listener;
|
||||
using namespace table_replication_metadata;
|
||||
|
||||
/***********************************************************************//**
|
||||
This function will register replication listener for every server
|
||||
provided and initialize all internal data structures and starts listening
|
||||
the replication stream.
|
||||
@return 0 on success, error code at failure. */
|
||||
int
|
||||
tb_replication_consistency_init(
|
||||
/*============================*/
|
||||
replication_listener_t *rpl, /*!< in: Server
|
||||
definition. */
|
||||
size_t n_servers, /*!< in: Number of servers */
|
||||
unsigned int gateway_server_id, /*!< in: MaxScale slave
|
||||
server id. */
|
||||
int trace_level) /*!< in: Trace level */
|
||||
{
|
||||
boost::uint32_t i;
|
||||
int err = 0;
|
||||
string errmsg="";
|
||||
|
||||
// Allocate memory for thread identifiers
|
||||
replication_listener_tid = (pthread_t*)malloc(sizeof(pthread_t) * (n_servers + 1));
|
||||
|
||||
if (replication_listener_tid == NULL) {
|
||||
errmsg = string("Fatal: Table_Replication_Consistency: out of memory");
|
||||
goto error_handling;
|
||||
}
|
||||
|
||||
replication_listener_metadata_tid = (pthread_t*)malloc(sizeof(pthread_t));
|
||||
|
||||
if (replication_listener_metadata_tid == NULL) {
|
||||
free(replication_listener_tid);
|
||||
errmsg = string("Fatal: Table_Replication_Consistency: out of memory");
|
||||
goto error_handling;
|
||||
}
|
||||
|
||||
// Set up trace level
|
||||
if (trace_level & TBR_TRACE_DEBUG) {
|
||||
tbr_debug = true;
|
||||
}
|
||||
|
||||
if (trace_level & TBR_TRACE_TRACE) {
|
||||
tbr_trace = true;
|
||||
}
|
||||
|
||||
// Find out the master server
|
||||
for(i=0;i < n_servers; i++) {
|
||||
if (rpl[i].is_master) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// If master is found read metadata from MySQL server, if not report error
|
||||
if (i < n_servers) {
|
||||
char *errm = NULL;
|
||||
if(!tb_replication_listener_init(&(rpl[i]), &errm)) {
|
||||
errmsg = std::string(errm);
|
||||
free(errm);
|
||||
}
|
||||
} else {
|
||||
errmsg = string("Master server is missing from configuration");
|
||||
goto error_handling;
|
||||
}
|
||||
|
||||
// Start replication stream reader thread for every server in the configuration
|
||||
for(i=0;i < n_servers; i++) {
|
||||
// We need to try catch all exceptions here because function
|
||||
// calling this service could be implemented using C-language
|
||||
// thus we need to protect it.
|
||||
try {
|
||||
rpl[i].gateway_slave_server_id = gateway_server_id;
|
||||
rpl[i].listener_id = i;
|
||||
|
||||
// For master we start also metadata updater
|
||||
if (rpl[i].is_master) {
|
||||
err = pthread_create(
|
||||
replication_listener_metadata_tid,
|
||||
NULL,
|
||||
&(tb_replication_listener_metadata_updater),
|
||||
(void *)&(rpl[i]));
|
||||
|
||||
if (err) {
|
||||
errmsg = string(strerror(err));
|
||||
goto error_handling;
|
||||
}
|
||||
}
|
||||
|
||||
// Start actual replication listener
|
||||
err = pthread_create(
|
||||
&replication_listener_tid[i],
|
||||
NULL,
|
||||
&(tb_replication_listener_reader),
|
||||
(void *) &(rpl[i]));
|
||||
|
||||
if (err) {
|
||||
errmsg = string(strerror(err));
|
||||
goto error_handling;
|
||||
}
|
||||
|
||||
}
|
||||
// Replication listener will use this exception for
|
||||
// error handling.
|
||||
catch(ListenerException const& e)
|
||||
{
|
||||
errmsg = e.what();
|
||||
goto error_handling;
|
||||
}
|
||||
// Boost library exceptions
|
||||
catch(boost::system::error_code const& e)
|
||||
{
|
||||
errmsg = e.message();
|
||||
goto error_handling;
|
||||
}
|
||||
// Try and catch all exceptions
|
||||
catch(std::exception const& e)
|
||||
{
|
||||
errmsg = e.what();
|
||||
goto error_handling;
|
||||
}
|
||||
// Rest of them
|
||||
catch(...)
|
||||
{
|
||||
errmsg = std::string("Unknown exception: ");
|
||||
goto error_handling;
|
||||
}
|
||||
}
|
||||
|
||||
// Number of threads
|
||||
n_replication_listeners = i;
|
||||
/* We will try to join the threads at shutdown */
|
||||
return (0);
|
||||
|
||||
error_handling:
|
||||
n_replication_listeners = i;
|
||||
rpl[i].error_message = (char *)malloc(errmsg.size()+1);
|
||||
strcpy(rpl[i].error_message, errmsg.c_str());
|
||||
|
||||
// This will log error to log file
|
||||
skygw_log_write_flush( LOGFILE_ERROR, (char *)errmsg.c_str());
|
||||
|
||||
return (1);
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
With this fuction client can request table consistency status for a
|
||||
single table. As a return client will receive a number of consistency
|
||||
status structures. Client must allocate memory for consistency result
|
||||
array and provide the maximum number of values returned. At return
|
||||
there is information how many results where available.
|
||||
@return 0 on success, error code at failure. */
|
||||
int
|
||||
tb_replication_consistency_query(
|
||||
/*=============================*/
|
||||
table_consistency_query_t *tb_query, /*!< in: Table consistency
|
||||
query. */
|
||||
table_consistency_t *tb_consistency, /*!< in: Table consistency
|
||||
status structure.*/
|
||||
size_t *n_servers) /*!< inout: Number of
|
||||
servers where to get table
|
||||
consistency status. Out: Number
|
||||
of successfull consistency
|
||||
query results. */
|
||||
{
|
||||
int err = 0;
|
||||
boost::uint32_t i = 0;
|
||||
std::string errmsg ="";
|
||||
|
||||
// We need to protect C client from exceptions here
|
||||
try {
|
||||
for(i = 0; i < *n_servers; i++) {
|
||||
err = tb_replication_listener_consistency((const unsigned char *)tb_query->db_dot_table, &tb_consistency[i], i);
|
||||
|
||||
if (err) {
|
||||
goto err_exit;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch(mysql::ListenerException const& e)
|
||||
{
|
||||
errmsg = e.what();
|
||||
goto error_handling;
|
||||
}
|
||||
catch(boost::system::error_code const& e)
|
||||
{
|
||||
errmsg = e.message();
|
||||
goto error_handling;
|
||||
}
|
||||
// Try and catch all exceptions
|
||||
catch(std::exception const& e)
|
||||
{
|
||||
errmsg = e.what();
|
||||
goto error_handling;
|
||||
}
|
||||
// Rest of them
|
||||
catch(...)
|
||||
{
|
||||
errmsg = std::string("Unknown exception: ");
|
||||
goto error_handling;
|
||||
}
|
||||
|
||||
*n_servers = i;
|
||||
return (err);
|
||||
|
||||
error_handling:
|
||||
tb_consistency[i].error_message = (char *)malloc(errmsg.size()+1);
|
||||
strcpy(tb_consistency[i].error_message, errmsg.c_str());
|
||||
// This will log error to log file
|
||||
skygw_log_write_flush( LOGFILE_ERROR, (char *)errmsg.c_str());
|
||||
|
||||
err_exit:
|
||||
*n_servers=i-1;
|
||||
tb_consistency[i].error_code = err;
|
||||
|
||||
return (1);
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
This function will reconnect replication listener to a server
|
||||
provided.
|
||||
@return 0 on success, error code at failure. */
|
||||
int
|
||||
tb_replication_consistency_reconnect(
|
||||
/*=================================*/
|
||||
replication_listener_t* rpl, /*!< in: Server definition.*/
|
||||
unsigned int gateway_server_id) /*!< in: MaxScale slave
|
||||
server id. */
|
||||
{
|
||||
std::string errmsg ="";
|
||||
int err = 0;
|
||||
|
||||
rpl->gateway_slave_server_id = gateway_server_id;
|
||||
|
||||
// We need to protect C client from exceptions here
|
||||
try {
|
||||
err = tb_replication_listener_reconnect(rpl, &replication_listener_tid[rpl->listener_id]);
|
||||
|
||||
if (err) {
|
||||
goto err_exit;
|
||||
}
|
||||
}
|
||||
catch(ListenerException const& e)
|
||||
{
|
||||
errmsg = e.what();
|
||||
goto error_handling;
|
||||
}
|
||||
catch(boost::system::error_code const& e)
|
||||
{
|
||||
errmsg = e.message();
|
||||
goto error_handling;
|
||||
}
|
||||
// Try and catch all exceptions
|
||||
catch(std::exception const& e)
|
||||
{
|
||||
errmsg = e.what();
|
||||
goto error_handling;
|
||||
}
|
||||
// Rest of them
|
||||
catch(...)
|
||||
{
|
||||
errmsg = std::string("Unknown exception: ");
|
||||
goto error_handling;
|
||||
}
|
||||
|
||||
return (err);
|
||||
|
||||
error_handling:
|
||||
rpl->error_message = (char *)malloc(errmsg.size()+1);
|
||||
strcpy(rpl->error_message, errmsg.c_str());
|
||||
// This will log error to log file
|
||||
skygw_log_write_flush( LOGFILE_ERROR, (char *)errmsg.c_str());
|
||||
|
||||
err_exit:
|
||||
return (1);
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
This function is to shutdown the replication listener and free all
|
||||
resources on table consistency. This function will store
|
||||
the current status on metadata to MySQL server.
|
||||
@return 0 on success, error code at failure. */
|
||||
int
|
||||
tb_replication_consistency_shutdown(
|
||||
/*================================*/
|
||||
char ** error_message) /*!< out: error message */
|
||||
{
|
||||
int err = 0;
|
||||
boost::uint32_t i = 0;
|
||||
std::string errmsg ="";
|
||||
|
||||
// We need to protect C client from exceptions here
|
||||
try {
|
||||
|
||||
// Wait until all replication listeners are shut down
|
||||
for(i = 0; i < n_replication_listeners; i++) {
|
||||
|
||||
err = tb_replication_listener_shutdown(i, error_message);
|
||||
|
||||
if (err) {
|
||||
goto err_exit;
|
||||
}
|
||||
|
||||
// Need to wait until the thread exits
|
||||
err = pthread_join(replication_listener_tid[i], (void **)error_message);
|
||||
|
||||
if (err) {
|
||||
goto err_exit;
|
||||
}
|
||||
}
|
||||
|
||||
listener_shutdown = true;
|
||||
|
||||
// Wait until metadata writer has shut down
|
||||
err = pthread_join(*replication_listener_metadata_tid, NULL);
|
||||
|
||||
if (err) {
|
||||
goto err_exit;
|
||||
}
|
||||
|
||||
// Write metadata to MySQL storage and clean up
|
||||
err = tb_replication_listener_done(error_message);
|
||||
|
||||
if (err) {
|
||||
goto err_exit;
|
||||
}
|
||||
}
|
||||
catch(mysql::ListenerException const& e)
|
||||
{
|
||||
errmsg = e.what();
|
||||
goto error_handling;
|
||||
}
|
||||
catch(boost::system::error_code const& e)
|
||||
{
|
||||
errmsg = e.message();
|
||||
goto error_handling;
|
||||
}
|
||||
// Try and catch all exceptions
|
||||
catch(std::exception const& e)
|
||||
{
|
||||
errmsg = e.what();
|
||||
goto error_handling;
|
||||
}
|
||||
// Rest of them
|
||||
catch(...)
|
||||
{
|
||||
errmsg = std::string("Unknown exception: ");
|
||||
goto error_handling;
|
||||
}
|
||||
|
||||
return (err);
|
||||
|
||||
error_handling:
|
||||
*error_message = (char *)malloc(errmsg.size()+1);
|
||||
strcpy(*error_message, errmsg.c_str());
|
||||
// This will log error to log file
|
||||
skygw_log_write_flush( LOGFILE_ERROR, (char *)errmsg.c_str());
|
||||
|
||||
err_exit:
|
||||
return (1);
|
||||
|
||||
}
|
@ -1,166 +0,0 @@
|
||||
/*
|
||||
Copyright (C) 2013-2014, MariaDB Corporation Ab
|
||||
|
||||
|
||||
This file is distributed as part of the MariaDB Corporation MaxScale. It is free
|
||||
software: you can redistribute it and/or modify it under the terms of the
|
||||
GNU General Public License as published by the Free Software Foundation,
|
||||
version 2.
|
||||
|
||||
This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
details.
|
||||
|
||||
You should have received a copy of the GNU General Public License along with
|
||||
this program; if not, write to the Free Software Foundation, Inc., 51
|
||||
Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
|
||||
Author: Jan Lindström jan.lindstrom@mariadb.com
|
||||
|
||||
Created: 20-06-2013
|
||||
Updated:
|
||||
|
||||
*/
|
||||
|
||||
#ifndef TABLE_REPLICATION_CONSISTENCY_H
|
||||
#define TABLE_REPLICATION_CONSISTENCY_H
|
||||
|
||||
#include <skygw_debug.h>
|
||||
|
||||
/* Global trace variables */
|
||||
extern bool tbr_trace;
|
||||
extern bool tbr_debug;
|
||||
|
||||
/* Structure definition for replication listener */
|
||||
typedef struct {
|
||||
char *server_url; /*!< in: Server address e.g.
|
||||
mysql://root:pw@127.0.0.1:3308 */
|
||||
unsigned long binlog_pos; /*!< in: Binlog position where to start
|
||||
listening. */
|
||||
int use_mariadb_gtid; /*!< in: 1 if MariaDB global
|
||||
transaction id should be used for
|
||||
binlog start position. */
|
||||
int use_mysql_gtid; /*!< in: 1 if MySQL global
|
||||
transaction id should be used for
|
||||
binlog start position. */
|
||||
int use_binlog_pos; /*!< in: 1 if binlog position
|
||||
should be used for binlog start
|
||||
position. */
|
||||
unsigned char *gtid; /*!< in: Global transaction identifier
|
||||
or NULL */
|
||||
size_t gtid_length; /*!< in: Real size of GTID */
|
||||
int is_master; /*!< in: Is this server a master 1 =
|
||||
yes, 0 = no. */
|
||||
int gateway_slave_server_id; /*!< in: replication listener slave
|
||||
server id. */
|
||||
int listener_id; /*!< in: listener id */
|
||||
int connection_suggesfull; /*!< out: 0 if connection successfull
|
||||
or error number. */
|
||||
char *error_message; /*!< out: error message in case of
|
||||
error. */
|
||||
} replication_listener_t;
|
||||
|
||||
/* Structure definition for table consistency query */
|
||||
typedef struct table_consistency_query {
|
||||
unsigned char *db_dot_table; /*!< in: Fully qualified database and
|
||||
table, e.g. Production.Orders. */
|
||||
} table_consistency_query_t;
|
||||
|
||||
/* Structure definition for table consistency result */
|
||||
typedef struct table_consistency {
|
||||
unsigned char *db_dot_table;/*!< out: Fully qualified database and
|
||||
table, e.g. Production.Orders. */
|
||||
unsigned int server_id; /*!< out: Server id where the consitency
|
||||
information is from. */
|
||||
int mariadb_gtid_known; /*!< out: 1 if MariaDB global
|
||||
transaction id is known. */
|
||||
int mysql_gtid_known; /*!< out: 1 if MySQL global
|
||||
transaction id is known. */
|
||||
unsigned long binlog_pos; /*!< out: Last seen binlog position
|
||||
on this server. */
|
||||
unsigned char *gtid; /*!< out: If global transacition id
|
||||
is known, will contain the id or NULL. */
|
||||
size_t gtid_length; /*!< out: Real length of GTID */
|
||||
int error_code; /*!< out: 0 if table consistency query
|
||||
for this server succesfull or error
|
||||
code. */
|
||||
char *error_message; /*!< out: Error message if table
|
||||
consistency query failed for this
|
||||
server failed. */
|
||||
} table_consistency_t;
|
||||
|
||||
/* Definitions for trace level */
|
||||
#define TBR_TRACE_TRACE (1UL << 1) /* Trace only important events and
|
||||
periodical consistency information */
|
||||
|
||||
/* Full trace of selected events and consistency information */
|
||||
#define TBR_TRACE_DEBUG ((1UL << 2) | TBR_TRACE_TRACE)
|
||||
|
||||
extern bool listener_shutdown; /* This flag will be true
|
||||
at shutdown */
|
||||
|
||||
|
||||
EXTERN_C_BLOCK_BEGIN
|
||||
|
||||
/* Interface functions */
|
||||
|
||||
/***********************************************************************//**
|
||||
This function will register replication listener for every server
|
||||
provided and initialize all internal data structures and starts listening
|
||||
the replication stream.
|
||||
@return 0 on success, error code at failure. */
|
||||
int
|
||||
tb_replication_consistency_init(
|
||||
/*============================*/
|
||||
replication_listener_t *rpl, /*!< in: Server
|
||||
definition. */
|
||||
size_t n_servers, /*!< in: Number of servers */
|
||||
unsigned int gateway_server_id, /*!< in: MaxScale slave
|
||||
server id. */
|
||||
int trace_level); /*!< in: trace level */
|
||||
|
||||
/***********************************************************************//**
|
||||
With this fuction client can request table consistency status for a
|
||||
single table. As a return client will receive a number of consistency
|
||||
status structures. Client must allocate memory for consistency result
|
||||
array and provide the maximum number of values returned. At return
|
||||
there is information how many results where available.
|
||||
@return 0 on success, error code at failure. */
|
||||
int
|
||||
tb_replication_consistency_query(
|
||||
/*=============================*/
|
||||
table_consistency_query_t *tb_query, /*!< in: Table consistency
|
||||
query. */
|
||||
table_consistency_t *tb_consistency, /*!< in: Table consistency
|
||||
status structure.*/
|
||||
size_t *n_servers); /*!< inout: Number of
|
||||
servers where to get table
|
||||
consistency status. Out: Number
|
||||
of successfull consistency
|
||||
query results. */
|
||||
|
||||
/***********************************************************************//**
|
||||
This function will reconnect replication listener to a server
|
||||
provided.
|
||||
@return 0 on success, error code at failure. */
|
||||
int
|
||||
tb_replication_consistency_reconnect(
|
||||
/*=================================*/
|
||||
replication_listener_t* rpl, /*!< in: Server definition.*/
|
||||
unsigned int gateway_server_id); /*!< in: MaxScale slave
|
||||
server id. */
|
||||
|
||||
/***********************************************************************//**
|
||||
This function is to shutdown the replication listener and free all
|
||||
resources on table consistency. This function will store
|
||||
the current status on metadata to MySQL server.
|
||||
@return 0 on success, error code at failure. */
|
||||
int
|
||||
tb_replication_consistency_shutdown(
|
||||
/*================================*/
|
||||
char ** error_message); /*!< out: error_message*/
|
||||
|
||||
EXTERN_C_BLOCK_END
|
||||
|
||||
#endif
|
File diff suppressed because it is too large
Load Diff
@ -1,120 +0,0 @@
|
||||
/*
|
||||
Copyright (C) 2013-2014, MariaDB Corporation Ab
|
||||
|
||||
|
||||
This file is distributed as part of the MariaDB Corporation MaxScale. It is free
|
||||
software: you can redistribute it and/or modify it under the terms of the
|
||||
GNU General Public License as published by the Free Software Foundation,
|
||||
version 2.
|
||||
|
||||
This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
details.
|
||||
|
||||
You should have received a copy of the GNU General Public License along with
|
||||
this program; if not, write to the Free Software Foundation, Inc., 51
|
||||
Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
|
||||
Author: Jan Lindström jan.lindstrom@mariadb.com
|
||||
|
||||
Created: 20-06-2013
|
||||
Updated:
|
||||
|
||||
*/
|
||||
|
||||
#ifndef TABLE_REPLICATION_LISTENER_H
|
||||
#define TABLE_REPLICATION_LISTENER_H
|
||||
|
||||
#include <boost/cstdint.hpp>
|
||||
#include <skygw_debug.h>
|
||||
|
||||
namespace mysql
|
||||
{
|
||||
|
||||
namespace table_replication_listener
|
||||
{
|
||||
|
||||
/***********************************************************************//**
|
||||
This is the function that is executed by replication listeners.
|
||||
At startup it will try to connect the server and start listening
|
||||
the actual replication stream.
|
||||
@return Pointer to error message. */
|
||||
void *tb_replication_listener_reader(
|
||||
/*=================================*/
|
||||
void *arg); /*!< in: Replication listener
|
||||
definition. */
|
||||
|
||||
/***********************************************************************//**
|
||||
With this fuction client can request table consistency status for a
|
||||
single table. As a return client will receive a number of consistency
|
||||
status structures. Client must allocate memory for consistency result
|
||||
array and provide the maximum number of values returned. At return
|
||||
there is information how many results where available.
|
||||
@return 0 on success, error code at failure. */
|
||||
int
|
||||
tb_replication_listener_consistency(
|
||||
/*================================*/
|
||||
const unsigned char *db_dot_table, /*!< in: Fully qualified table
|
||||
name. */
|
||||
table_consistency_t *tb_consistency, /*!< out: Consistency values. */
|
||||
boost::uint32_t server_no); /*!< in: Server */
|
||||
|
||||
/***********************************************************************//**
|
||||
This function will reconnect replication listener to a server
|
||||
provided.
|
||||
@return 0 on success, error code at failure. */
|
||||
int
|
||||
tb_replication_listener_reconnect(
|
||||
/*==============================*/
|
||||
replication_listener_t* rpl, /*!< in: Server definition.*/
|
||||
pthread_t* tid); /*!< in: Thread id */
|
||||
|
||||
/***********************************************************************//**
|
||||
This function is to shutdown the replication listener and free all
|
||||
resources on table consistency. This function (TODO) will store
|
||||
the current status on metadata to MySQL server.
|
||||
@return 0 on success, error code at failure. */
|
||||
int
|
||||
tb_replication_listener_shutdown(
|
||||
/*=============================*/
|
||||
boost::uint32_t server_id, /*!< in: server id */
|
||||
char **error_message); /*!< out: error message */
|
||||
|
||||
/***********************************************************************//**
|
||||
This internal function is executed on its own thread and it will write
|
||||
table consistency information to the master database in every n seconds
|
||||
based on configuration.
|
||||
*/
|
||||
void
|
||||
*tb_replication_listener_metadata_updater(
|
||||
/*======================================*/
|
||||
void *arg); /*!< in: Master definition */
|
||||
|
||||
/***********************************************************************//**
|
||||
Read current state of the metadata from the MySQL server or create
|
||||
necessary metadata and initialize listener metadata.
|
||||
@return true on success, false on failure
|
||||
*/
|
||||
bool
|
||||
tb_replication_listener_init(
|
||||
/*=========================*/
|
||||
replication_listener_t* rpl, /*! in: Master server definition */
|
||||
char **error_message); /*!< out: error message */
|
||||
|
||||
/***********************************************************************//**
|
||||
Write current state of the metadata to the MySQL server and
|
||||
clean up the data structures.
|
||||
@return 0 on success, error code at failure. */
|
||||
int
|
||||
tb_replication_listener_done(
|
||||
/*==========================*/
|
||||
char **error_message); /*!< out: error message */
|
||||
|
||||
|
||||
} // table_replication_listener
|
||||
|
||||
} // mysql
|
||||
|
||||
|
||||
#endif
|
@ -1,984 +0,0 @@
|
||||
/*
|
||||
Copyright (C) 2013, MariaDB Corporation Ab
|
||||
|
||||
|
||||
This file is distributed as part of the MariaDB Corporation MaxScale. It is free
|
||||
software: you can redistribute it and/or modify it under the terms of the
|
||||
GNU General Public License as published by the Free Software Foundation,
|
||||
version 2.
|
||||
|
||||
This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
details.
|
||||
|
||||
You should have received a copy of the GNU General Public License along with
|
||||
this program; if not, write to the Free Software Foundation, Inc., 51
|
||||
Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
|
||||
Author: Jan Lindström jan.lindstrom@mariadb.com
|
||||
Created: 15-07-2013
|
||||
Updated:
|
||||
*/
|
||||
#include "binlog_api.h"
|
||||
#include <getopt.h>
|
||||
#include <iostream>
|
||||
#include <iomanip>
|
||||
#include <map>
|
||||
#include <sstream>
|
||||
#include <stdlib.h>
|
||||
#include <errno.h>
|
||||
#include <string.h>
|
||||
#include <regex.h>
|
||||
#include <algorithm>
|
||||
#include "listener_exception.h"
|
||||
#include <mysql.h>
|
||||
#include <mysqld_error.h>
|
||||
#include "table_replication_metadata.h"
|
||||
#include "table_replication_consistency.h"
|
||||
#include "log_manager.h"
|
||||
|
||||
namespace mysql {
|
||||
|
||||
namespace table_replication_metadata {
|
||||
|
||||
/***********************************************************************//**
|
||||
Internal function to write error messages to the log file.
|
||||
*/
|
||||
static void
|
||||
tbrm_report_error(
|
||||
/*==============*/
|
||||
MYSQL *con, /*!< in: MySQL connection */
|
||||
const char *message, /*!< in: Error message */
|
||||
const char *file, /*!< in: File name */
|
||||
int line) /*!< in: Line number */
|
||||
{
|
||||
skygw_log_write_flush( LOGFILE_ERROR,
|
||||
(char *)"%s at file %s line %d", message, file, line);
|
||||
if (con != NULL) {
|
||||
skygw_log_write_flush( LOGFILE_ERROR,
|
||||
(char *)"%s", mysql_error(con));
|
||||
mysql_close(con);
|
||||
}
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
Internal function to write statement error messages to the log file.
|
||||
*/
|
||||
static void
|
||||
tbrm_stmt_error(
|
||||
/*============*/
|
||||
MYSQL_STMT *stmt, /*!< in: MySQL statement */
|
||||
const char *message, /*!< in: Error message */
|
||||
const char *file, /*!< in: File name */
|
||||
int line) /*!< in: Line number */
|
||||
{
|
||||
skygw_log_write_flush( LOGFILE_ERROR,
|
||||
(char *)"%s at file %s line %d", message, file, line);
|
||||
|
||||
if (stmt != NULL)
|
||||
{
|
||||
skygw_log_write_flush( LOGFILE_ERROR,
|
||||
(char *)"Error %u (%s): %s\n",
|
||||
mysql_stmt_errno (stmt),
|
||||
mysql_stmt_sqlstate (stmt),
|
||||
mysql_stmt_error (stmt));
|
||||
}
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
Inspect master data dictionary and if necessary table replication
|
||||
consistency metadata is not created, create it.
|
||||
@return false if create failed, true if metadata already created or
|
||||
create succeeded */
|
||||
static bool
|
||||
tbrm_create_metadata(
|
||||
/*=================*/
|
||||
const char *master_host, /*!< in: Master host name */
|
||||
const char *user, /*!< in: Username */
|
||||
const char *passwd, /*!< in: Passwd */
|
||||
unsigned int master_port) /*!< in: Master port */
|
||||
{
|
||||
MYSQL *con = mysql_init(NULL);
|
||||
unsigned int myerrno=0;
|
||||
|
||||
if (!con) {
|
||||
skygw_log_write_flush( LOGFILE_ERROR,
|
||||
(char *)"Mysql init failed", mysql_error(con));
|
||||
return false;
|
||||
}
|
||||
|
||||
mysql_options(con, MYSQL_READ_DEFAULT_GROUP, "libmysqld_client");
|
||||
mysql_options(con, MYSQL_OPT_USE_REMOTE_CONNECTION, NULL);
|
||||
|
||||
if (!mysql_real_connect(con, master_host, user, passwd, NULL, master_port, NULL, 0)) {
|
||||
tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Check is the database there
|
||||
mysql_query(con, "USE SKYSQL_GATEWAY_METADATA");
|
||||
myerrno = mysql_errno(con);
|
||||
|
||||
if (myerrno == 0) {
|
||||
// Database found, assuming everyting ok
|
||||
return true;
|
||||
} else if (myerrno != ER_BAD_DB_ERROR) {
|
||||
tbrm_report_error(con, "Error: mysql_query(USE_SKYSQL_GATEWAY_METADATA) failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Create databse
|
||||
mysql_query(con, "CREATE DATABASE SKYSQL_GATEWAY_METADATA");
|
||||
|
||||
if (mysql_errno(con) != 0) {
|
||||
tbrm_report_error(con, "mysql_query(CREATE DATABASE SKYSQL_GATEWAY_METADATA) failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Set correct database
|
||||
mysql_query(con, "USE SKYSQL_GATEWAY_METADATA");
|
||||
|
||||
if (mysql_errno(con) != 0) {
|
||||
tbrm_report_error(con, "Error: mysql_query(USE_SKYSQL_GATEWAY_METADATA) failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Create consistency table
|
||||
mysql_query(con, "CREATE TABLE TABLE_REPLICATION_CONSISTENCY("
|
||||
"DB_TABLE_NAME VARCHAR(255) NOT NULL,"
|
||||
"SERVER_ID INT NOT NULL,"
|
||||
"GTID VARBINARY(255),"
|
||||
"BINLOG_POS BIGINT NOT NULL,"
|
||||
"GTID_KNOWN INT,"
|
||||
"PRIMARY KEY(DB_TABLE_NAME, SERVER_ID)) ENGINE=InnoDB");
|
||||
|
||||
if (mysql_errno(con) != 0) {
|
||||
tbrm_report_error(con, "Error: Create table failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Above clauses not really transactional, but lets play safe
|
||||
mysql_query(con, "COMMIT");
|
||||
|
||||
if (mysql_errno(con) != 0) {
|
||||
tbrm_report_error(con, "Error: Commit failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Create servers table
|
||||
mysql_query(con, "CREATE TABLE TABLE_REPLICATION_SERVERS("
|
||||
"SERVER_ID INT NOT NULL,"
|
||||
"BINLOG_POS BIGINT NOT NULL,"
|
||||
"GTID VARBINARY(255),"
|
||||
"GTID_KNOWN INT,"
|
||||
"SERVER_TYPE INT,"
|
||||
"PRIMARY KEY(SERVER_ID)) ENGINE=InnoDB");
|
||||
|
||||
if (mysql_errno(con) != 0) {
|
||||
tbrm_report_error(con, "Error: Create table failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Above clauses not really transactional, but lets play safe
|
||||
mysql_query(con, "COMMIT");
|
||||
|
||||
if (mysql_errno(con) != 0) {
|
||||
tbrm_report_error(con, "Error: Commit failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
mysql_close(con);
|
||||
|
||||
// Done
|
||||
return true;
|
||||
|
||||
error_exit:
|
||||
|
||||
if (con) {
|
||||
mysql_close(con);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
Read table replication consistency metadata from the MySQL master server.
|
||||
This function will create necessary database and table if they are not
|
||||
yet created.
|
||||
@return false if read failed, true if read succeeded */
|
||||
bool
|
||||
tbrm_read_consistency_metadata(
|
||||
/*===========================*/
|
||||
const char *master_host, /*!< in: Master hostname */
|
||||
const char *user, /*!< in: username */
|
||||
const char *passwd, /*!< in: password */
|
||||
unsigned int master_port, /*!< in: master port */
|
||||
tbr_metadata_t **tbrm_meta, /*!< out: table replication consistency
|
||||
metadata. */
|
||||
size_t *tbrm_rows) /*!< out: number of rows read */
|
||||
{
|
||||
unsigned int myerrno=0;
|
||||
boost::uint64_t nrows=0;
|
||||
boost::uint64_t i=0;
|
||||
MYSQL_RES *result = NULL;
|
||||
tbr_metadata_t *tm=NULL;
|
||||
|
||||
tbrm_create_metadata(master_host, user, passwd, master_port);
|
||||
|
||||
MYSQL *con = mysql_init(NULL);
|
||||
|
||||
if (!con) {
|
||||
skygw_log_write_flush( LOGFILE_ERROR,
|
||||
(char *)"Error: MySQL init failed");
|
||||
return false;
|
||||
}
|
||||
|
||||
mysql_options(con, MYSQL_READ_DEFAULT_GROUP, "libmysqld_client");
|
||||
mysql_options(con, MYSQL_OPT_USE_REMOTE_CONNECTION, NULL);
|
||||
|
||||
if (!mysql_real_connect(con, master_host, user, passwd, NULL, master_port, NULL, 0)) {
|
||||
tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
mysql_query(con, "USE SKYSQL_GATEWAY_METADATA");
|
||||
myerrno = mysql_errno(con);
|
||||
|
||||
if (myerrno != 0) {
|
||||
tbrm_report_error(con, "Error: Database set failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
mysql_query(con, "SELECT * FROM TABLE_REPLICATION_CONSISTENCY");
|
||||
myerrno = mysql_errno(con);
|
||||
|
||||
if (myerrno != 0) {
|
||||
tbrm_report_error(con,"Error: Select from table_replication_consistency failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
result = mysql_store_result(con);
|
||||
|
||||
if (!result) {
|
||||
tbrm_report_error(con, "Error: mysql_store_result failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
nrows = mysql_num_rows(result);
|
||||
|
||||
tm = (tbr_metadata_t*) malloc(nrows * sizeof(tbr_metadata_t));
|
||||
|
||||
if (!tm) {
|
||||
skygw_log_write_flush( LOGFILE_ERROR,
|
||||
(char *)"Error: Out of memory");
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
memset(tm, 0, nrows * sizeof(tbr_metadata_t));
|
||||
*tbrm_rows = nrows;
|
||||
*tbrm_meta = tm;
|
||||
|
||||
for(i=0;i < nrows; i++) {
|
||||
MYSQL_ROW row = mysql_fetch_row(result);
|
||||
unsigned long *lengths = mysql_fetch_lengths(result);
|
||||
// DB_TABLE_NAME
|
||||
tm[i].db_table = (unsigned char *)malloc(lengths[0]);
|
||||
|
||||
if (!tm[i].db_table) {
|
||||
skygw_log_write_flush( LOGFILE_ERROR,
|
||||
(char *)"Error: Out of memory");
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
strcpy((char *)tm[i].db_table, row[0]);
|
||||
// SERVER_ID
|
||||
tm[i].server_id = atol(row[1]);
|
||||
// GTID
|
||||
tm[i].gtid = (unsigned char *)malloc((lengths[2])*sizeof(unsigned char));
|
||||
|
||||
if (!tm[i].gtid) {
|
||||
free(tm[i].db_table);
|
||||
skygw_log_write_flush( LOGFILE_ERROR,
|
||||
(char *)"Error: Out of memory");
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
memcpy(tm[i].gtid, row[2], lengths[2]);
|
||||
tm[i].gtid_len = lengths[2];
|
||||
// BINLOG_POS
|
||||
tm[i].binlog_pos = atoll(row[3]);
|
||||
// GTID_KNOWN
|
||||
tm[i].gtid_known = atol(row[4]);
|
||||
}
|
||||
|
||||
mysql_free_result(result);
|
||||
mysql_close(con);
|
||||
|
||||
return true;
|
||||
|
||||
error_exit:
|
||||
|
||||
if (tm) {
|
||||
for(size_t k=0;i < i; k++) {
|
||||
free(tm[k].db_table);
|
||||
free(tm[k].gtid);
|
||||
}
|
||||
free(tm);
|
||||
*tbrm_rows = 0;
|
||||
*tbrm_meta = NULL;
|
||||
}
|
||||
|
||||
if (result) {
|
||||
mysql_free_result(result);
|
||||
}
|
||||
|
||||
if (con) {
|
||||
mysql_close(con);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
Write table replication consistency metadata from the MySQL master server.
|
||||
This function assumes that necessary database and table are created.
|
||||
@return false if read failed, true if read succeeded */
|
||||
bool
|
||||
tbrm_write_consistency_metadata(
|
||||
/*============================*/
|
||||
const char *master_host, /*!< in: Master hostname */
|
||||
const char *user, /*!< in: username */
|
||||
const char *passwd, /*!< in: password */
|
||||
unsigned int master_port, /*!< in: master port */
|
||||
tbr_metadata_t **tbrm_meta, /*!< in: table replication consistency
|
||||
metadata. */
|
||||
size_t tbrm_rows) /*!< in: number of rows read */
|
||||
{
|
||||
int myerrno=0;
|
||||
boost::uint32_t i;
|
||||
MYSQL_STMT *sstmt=NULL;
|
||||
MYSQL_STMT *istmt=NULL;
|
||||
MYSQL_STMT *ustmt=NULL;
|
||||
MYSQL_BIND sparam[2];
|
||||
MYSQL_BIND iparam[5];
|
||||
MYSQL_BIND uparam[5];
|
||||
MYSQL_BIND result[1];
|
||||
char *dbtable=NULL;
|
||||
void *gtid=NULL;
|
||||
int gtidknown;
|
||||
int serverid;
|
||||
boost::uint64_t binlogpos;
|
||||
|
||||
// Query to find out if the row already exists on table
|
||||
const char *sst = "SELECT BINLOG_POS FROM TABLE_REPLICATION_CONSISTENCY WHERE"
|
||||
" DB_TABLE_NAME=? and SERVER_ID=?";
|
||||
|
||||
// Insert Query
|
||||
const char *ist = "INSERT INTO TABLE_REPLICATION_CONSISTENCY(DB_TABLE_NAME,"
|
||||
" SERVER_ID, GTID, BINLOG_POS, GTID_KNOWN) VALUES"
|
||||
"(?, ?, ?, ?, ?)";
|
||||
|
||||
// Update Query
|
||||
const char *ust = "UPDATE TABLE_REPLICATION_CONSISTENCY "
|
||||
"SET GTID=?, BINLOG_POS=?, GTID_KNOWN=?"
|
||||
" WHERE DB_TABLE_NAME=? AND SERVER_ID=?";
|
||||
|
||||
MYSQL *con = mysql_init(NULL);
|
||||
|
||||
if (!con) {
|
||||
skygw_log_write_flush( LOGFILE_ERROR,
|
||||
(char *)"Mysql init failed", mysql_error(con));
|
||||
return false;
|
||||
}
|
||||
|
||||
mysql_options(con, MYSQL_READ_DEFAULT_GROUP, "libmysqld_client");
|
||||
mysql_options(con, MYSQL_OPT_USE_REMOTE_CONNECTION, NULL);
|
||||
|
||||
if (!mysql_real_connect(con, master_host, user, passwd, NULL, master_port, NULL, 0)) {
|
||||
tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
mysql_query(con, "USE SKYSQL_GATEWAY_METADATA");
|
||||
myerrno = mysql_errno(con);
|
||||
|
||||
if (myerrno != 0) {
|
||||
tbrm_report_error(con, "Error: Database set failed", __FILE__, __LINE__);
|
||||
}
|
||||
|
||||
// Allocate statement handlers
|
||||
sstmt = mysql_stmt_init(con);
|
||||
istmt = mysql_stmt_init(con);
|
||||
ustmt = mysql_stmt_init(con);
|
||||
|
||||
if (sstmt == NULL || istmt == NULL || ustmt == NULL) {
|
||||
tbrm_report_error(con, "Could not initialize statement handler", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Prepare the statements
|
||||
if (mysql_stmt_prepare(sstmt, sst, strlen(sst)) != 0) {
|
||||
tbrm_stmt_error(sstmt, "Error: Could not prepare select statement", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
if (mysql_stmt_prepare(istmt, ist, strlen(ist)) != 0) {
|
||||
tbrm_stmt_error(istmt, "Error: Could not prepare insert statement", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
if (mysql_stmt_prepare(ustmt, ust, strlen(ust)) != 0) {
|
||||
tbrm_stmt_error(ustmt, "Error: Could not prepare update statement", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Initialize the parameters
|
||||
memset (sparam, 0, sizeof (sparam));
|
||||
memset (iparam, 0, sizeof (iparam));
|
||||
memset (uparam, 0, sizeof (uparam));
|
||||
memset (result, 0, sizeof (result));
|
||||
|
||||
// Init param structure
|
||||
// Select
|
||||
sparam[0].buffer_type = MYSQL_TYPE_VARCHAR;
|
||||
sparam[1].buffer_type = MYSQL_TYPE_LONG;
|
||||
sparam[1].buffer = (void *) &serverid;
|
||||
// Insert
|
||||
iparam[0].buffer_type = MYSQL_TYPE_VARCHAR;
|
||||
iparam[1].buffer_type = MYSQL_TYPE_LONG;
|
||||
iparam[1].buffer = (void *) &serverid;
|
||||
iparam[2].buffer_type = MYSQL_TYPE_BLOB;
|
||||
iparam[3].buffer_type = MYSQL_TYPE_LONGLONG;
|
||||
iparam[3].buffer = (void *) &binlogpos;
|
||||
iparam[4].buffer_type = MYSQL_TYPE_SHORT;
|
||||
iparam[4].buffer = (void *) >idknown;
|
||||
// Update
|
||||
uparam[0].buffer_type = MYSQL_TYPE_BLOB;
|
||||
uparam[1].buffer_type = MYSQL_TYPE_LONGLONG;
|
||||
uparam[1].buffer = (void *) &binlogpos;
|
||||
uparam[2].buffer_type = MYSQL_TYPE_SHORT;
|
||||
uparam[2].buffer = (void *) >idknown;
|
||||
uparam[3].buffer_type = MYSQL_TYPE_VARCHAR;
|
||||
uparam[4].buffer_type = MYSQL_TYPE_LONG;
|
||||
uparam[4].buffer = (void *) &serverid;
|
||||
// Result set for select
|
||||
result[0].buffer_type = MYSQL_TYPE_LONGLONG;
|
||||
result[0].buffer = &binlogpos;
|
||||
|
||||
|
||||
// Iterate through the data
|
||||
for(i = 0; i < tbrm_rows; i++) {
|
||||
// Start from Select, we need to know if the consistency
|
||||
// information for this table, server pair is already
|
||||
// in metadata or not.
|
||||
|
||||
dbtable = (char *)tbrm_meta[i]->db_table;
|
||||
gtid = (char *)tbrm_meta[i]->gtid;
|
||||
gtidknown = tbrm_meta[i]->gtid_known;
|
||||
serverid = tbrm_meta[i]->server_id;
|
||||
uparam[3].buffer = (void *) dbtable;
|
||||
|
||||
sparam[0].buffer = (void *) dbtable;
|
||||
uparam[0].buffer = (void *) gtid;
|
||||
iparam[0].buffer = (void *) dbtable;
|
||||
iparam[2].buffer = (void *) gtid;
|
||||
sparam[0].buffer_length = strlen(dbtable);
|
||||
uparam[3].buffer_length = sparam[0].buffer_length;
|
||||
iparam[0].buffer_length = sparam[0].buffer_length;
|
||||
uparam[0].buffer_length = tbrm_meta[i]->gtid_len;
|
||||
iparam[2].buffer_length = tbrm_meta[i]->gtid_len;
|
||||
|
||||
// Bind param structure to statement
|
||||
if (mysql_stmt_bind_param(sstmt, sparam) != 0) {
|
||||
tbrm_stmt_error(sstmt, "Error: Could not bind select parameters", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Bind result structure to statement
|
||||
if (mysql_stmt_bind_result(sstmt, result) != 0) {
|
||||
tbrm_stmt_error(sstmt, "Error: Could not bind select return parameters", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Execute!!
|
||||
if (mysql_stmt_execute(sstmt) != 0) {
|
||||
tbrm_stmt_error(sstmt, "Error: Could not execute select statement", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Store result
|
||||
if (mysql_stmt_store_result(sstmt) != 0) {
|
||||
tbrm_stmt_error(sstmt, "Error: Could not buffer result set", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Fetch result
|
||||
myerrno = mysql_stmt_fetch(sstmt);
|
||||
if (myerrno != 0 && myerrno != MYSQL_NO_DATA) {
|
||||
tbrm_stmt_error(sstmt, "Error: Could not fetch result set", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// If fetch returned 0 rows, it means that this table, serverid
|
||||
// pair was found from metadata, we might need to update
|
||||
// the consistency information.
|
||||
if (myerrno == 0) {
|
||||
// We update the consistency if and only if the
|
||||
// binlog position for this table has changed
|
||||
if (binlogpos != tbrm_meta[i]->binlog_pos) {
|
||||
// Update the consistency information
|
||||
binlogpos = tbrm_meta[i]->binlog_pos;
|
||||
|
||||
// Bind param structure to statement
|
||||
if (mysql_stmt_bind_param(ustmt, uparam) != 0) {
|
||||
tbrm_stmt_error(ustmt, "Error: Could not bind update parameters", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
// Execute!!
|
||||
if (mysql_stmt_execute(ustmt) != 0) {
|
||||
tbrm_stmt_error(ustmt, "Error: Could not execute update statement", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
if (tbr_debug) {
|
||||
skygw_log_write_flush( LOGFILE_TRACE,
|
||||
(char *)"TRC Debug: Metadata state updated for %s in server %d is binlog_pos %lu gtid '%s'",
|
||||
dbtable, serverid, binlogpos, gtid);
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// Insert the consistency information
|
||||
binlogpos = tbrm_meta[i]->binlog_pos;
|
||||
// Bind param structure to statement
|
||||
if (mysql_stmt_bind_param(istmt, iparam) != 0) {
|
||||
tbrm_stmt_error(istmt, "Error: Could not bind insert parameters", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Execute!!
|
||||
if (mysql_stmt_execute(istmt) != 0) {
|
||||
tbrm_stmt_error(istmt, "Error: Could not execute insert statement", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
if (tbr_debug) {
|
||||
skygw_log_write_flush( LOGFILE_TRACE,
|
||||
(char *)"TRC Debug: Metadata state inserted for %s in server %d is binlog_pos %lu gtid '%s'",
|
||||
dbtable, serverid, binlogpos, gtid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
error_exit:
|
||||
// Cleanup
|
||||
if (sstmt) {
|
||||
if (mysql_stmt_close(sstmt)) {
|
||||
tbrm_stmt_error(sstmt, "Error: Could not close select statement", __FILE__, __LINE__);
|
||||
}
|
||||
}
|
||||
|
||||
if (istmt) {
|
||||
if (mysql_stmt_close(istmt)) {
|
||||
tbrm_stmt_error(istmt, "Error: Could not close select statement", __FILE__, __LINE__);
|
||||
}
|
||||
}
|
||||
|
||||
if (ustmt) {
|
||||
if (mysql_stmt_close(ustmt)) {
|
||||
tbrm_stmt_error(ustmt, "Error: Could not close select statement", __FILE__, __LINE__);
|
||||
}
|
||||
}
|
||||
|
||||
if (con) {
|
||||
mysql_close(con);
|
||||
}
|
||||
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
Read table replication server metadata from the MySQL master server.
|
||||
This function will create necessary database and table if they are not
|
||||
yet created.
|
||||
@return false if read failed, true if read succeeded */
|
||||
bool
|
||||
tbrm_read_server_metadata(
|
||||
/*======================*/
|
||||
const char *master_host, /*!< in: Master hostname */
|
||||
const char *user, /*!< in: username */
|
||||
const char *passwd, /*!< in: password */
|
||||
unsigned int master_port, /*!< in: master port */
|
||||
tbr_server_t **tbrm_servers,/*!< out: table replication server
|
||||
metadata. */
|
||||
size_t *tbrm_rows) /*!< out: number of rows read */
|
||||
{
|
||||
unsigned int myerrno=0;
|
||||
boost::uint64_t nrows=0;
|
||||
boost::uint64_t i=0;
|
||||
MYSQL_RES *result = NULL;
|
||||
tbr_server_t *ts=NULL;
|
||||
|
||||
tbrm_create_metadata(master_host, user, passwd, master_port);
|
||||
|
||||
MYSQL *con = mysql_init(NULL);
|
||||
|
||||
if (!con) {
|
||||
skygw_log_write_flush( LOGFILE_ERROR,
|
||||
(char *)"Mysql init failed", mysql_error(con));
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
mysql_options(con, MYSQL_READ_DEFAULT_GROUP, "libmysqld_client");
|
||||
mysql_options(con, MYSQL_OPT_USE_REMOTE_CONNECTION, NULL);
|
||||
|
||||
if (!mysql_real_connect(con, master_host, user, passwd, NULL, master_port, NULL, 0)) {
|
||||
tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
mysql_query(con, "USE SKYSQL_GATEWAY_METADATA");
|
||||
myerrno = mysql_errno(con);
|
||||
|
||||
if (myerrno != 0) {
|
||||
tbrm_report_error(con, "Error: Database set failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
mysql_query(con, "SELECT * FROM TABLE_REPLICATION_SERVERS");
|
||||
myerrno = mysql_errno(con);
|
||||
|
||||
if (myerrno != 0) {
|
||||
tbrm_report_error(con,"Error: Select from table_replication_consistency failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
result = mysql_store_result(con);
|
||||
|
||||
if (!result) {
|
||||
tbrm_report_error(con, "Error: mysql_store_result failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
nrows = mysql_num_rows(result);
|
||||
|
||||
ts = (tbr_server_t*) malloc(nrows * sizeof(tbr_server_t));
|
||||
|
||||
if(!ts) {
|
||||
skygw_log_write_flush( LOGFILE_ERROR,
|
||||
(char *)"Error: Out of memory");
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
*tbrm_rows = nrows;
|
||||
*tbrm_servers = ts;
|
||||
|
||||
for(i=0;i < nrows; i++) {
|
||||
MYSQL_ROW row = mysql_fetch_row(result);
|
||||
unsigned long *lengths = mysql_fetch_lengths(result);
|
||||
// SERVER_ID
|
||||
ts[i].server_id = atol(row[0]);
|
||||
// BINLOG_POS
|
||||
ts[i].binlog_pos = atoll(row[1]);
|
||||
// GTID
|
||||
ts[i].gtid = (unsigned char *)malloc((lengths[2])*sizeof(unsigned char));
|
||||
|
||||
if (!ts[i].gtid) {
|
||||
skygw_log_write_flush( LOGFILE_ERROR,
|
||||
(char *)"Error: Out of memory");
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
memcpy(ts[i].gtid, row[2], lengths[2]);
|
||||
ts[i].gtid_len = lengths[2];
|
||||
// GTID_KNOWN
|
||||
ts[i].gtid_known = atol(row[3]);
|
||||
// SERVER_TYPE
|
||||
ts[i].server_type = atol(row[4]);
|
||||
}
|
||||
|
||||
mysql_free_result(result);
|
||||
mysql_close(con);
|
||||
|
||||
return true;
|
||||
|
||||
error_exit:
|
||||
if (ts) {
|
||||
for(size_t k=0;i < i; k++) {
|
||||
free(ts[k].gtid);
|
||||
}
|
||||
free(ts);
|
||||
*tbrm_rows = 0;
|
||||
*tbrm_servers = NULL;
|
||||
}
|
||||
|
||||
if (result) {
|
||||
mysql_free_result(result);
|
||||
}
|
||||
|
||||
if (con) {
|
||||
mysql_close(con);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
Write table replication server metadata from the MySQL master server.
|
||||
This function assumes that necessary database and table are created.
|
||||
@return false if read failed, true if read succeeded */
|
||||
bool
|
||||
tbrm_write_server_metadata(
|
||||
/*=======================*/
|
||||
const char *master_host, /*!< in: Master hostname */
|
||||
const char *user, /*!< in: username */
|
||||
const char *passwd, /*!< in: password */
|
||||
unsigned int master_port, /*!< in: master port */
|
||||
tbr_server_t **tbrm_servers,/*!< in: table replication server
|
||||
metadata. */
|
||||
size_t tbrm_rows) /*!< in: number of rows read */
|
||||
{
|
||||
int myerrno=0;
|
||||
boost::uint32_t i;
|
||||
MYSQL_STMT *sstmt=NULL;
|
||||
MYSQL_STMT *istmt=NULL;
|
||||
MYSQL_STMT *ustmt=NULL;
|
||||
MYSQL_BIND sparam[1];
|
||||
MYSQL_BIND iparam[5];
|
||||
MYSQL_BIND uparam[4];
|
||||
MYSQL_BIND result[1];
|
||||
char *dbtable;
|
||||
void *gtid;
|
||||
int gtidknown;
|
||||
unsigned int serverid;
|
||||
int servertype;
|
||||
boost::uint64_t binlogpos;
|
||||
|
||||
// Query to find out if the row already exists on table
|
||||
const char *sst = "SELECT BINLOG_POS FROM TABLE_REPLICATION_CONSISTENCY WHERE"
|
||||
" SERVER_ID=?";
|
||||
|
||||
// Insert Query
|
||||
const char *ist = "INSERT INTO TABLE_REPLICATION_SERVERS("
|
||||
" SERVER_ID, GTID, BINLOG_POS, GTID_KNOWN, SERVER_TYPE) VALUES"
|
||||
"(?, ?, ?, ?, ?)";
|
||||
|
||||
// Update Query
|
||||
const char *ust = "UPDATE TABLE_REPLICATION_SERVERS "
|
||||
"SET GTID=?, BINLOG_POS=?, GTID_KNOWN=?"
|
||||
" WHERE SERVER_ID=?";
|
||||
|
||||
MYSQL *con = mysql_init(NULL);
|
||||
|
||||
if (!con) {
|
||||
skygw_log_write_flush( LOGFILE_ERROR,
|
||||
(char *)"Mysql init failed", mysql_error(con));
|
||||
return false;
|
||||
}
|
||||
|
||||
mysql_options(con, MYSQL_READ_DEFAULT_GROUP, "libmysqld_client");
|
||||
mysql_options(con, MYSQL_OPT_USE_REMOTE_CONNECTION, NULL);
|
||||
|
||||
if (!mysql_real_connect(con, master_host, user, passwd, NULL, master_port, NULL, 0)) {
|
||||
tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
mysql_query(con, "USE SKYSQL_GATEWAY_METADATA");
|
||||
myerrno = mysql_errno(con);
|
||||
|
||||
if (myerrno != 0) {
|
||||
tbrm_report_error(con, "Error: Database set failed", __FILE__, __LINE__);
|
||||
}
|
||||
|
||||
// Allocate statement handlers
|
||||
sstmt = mysql_stmt_init(con);
|
||||
istmt = mysql_stmt_init(con);
|
||||
ustmt = mysql_stmt_init(con);
|
||||
|
||||
if (sstmt == NULL || istmt == NULL || ustmt == NULL) {
|
||||
tbrm_report_error(con, "Could not initialize statement handler", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Prepare the statements
|
||||
if (mysql_stmt_prepare(sstmt, sst, strlen(sst)) != 0) {
|
||||
tbrm_stmt_error(sstmt, "Error: Could not prepare select statement", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
if (mysql_stmt_prepare(istmt, ist, strlen(ist)) != 0) {
|
||||
tbrm_stmt_error(istmt, "Error: Could not prepare insert statement", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
if (mysql_stmt_prepare(ustmt, ust, strlen(ust)) != 0) {
|
||||
tbrm_stmt_error(ustmt, "Error: Could not prepare update statement", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Initialize the parameters
|
||||
memset (sparam, 0, sizeof (sparam));
|
||||
memset (iparam, 0, sizeof (iparam));
|
||||
memset (uparam, 0, sizeof (uparam));
|
||||
memset (result, 0, sizeof (result));
|
||||
|
||||
// Init param structure
|
||||
// Select
|
||||
sparam[0].buffer_type = MYSQL_TYPE_LONG;
|
||||
sparam[0].buffer = (void *) &serverid;
|
||||
// Insert
|
||||
iparam[0].buffer_type = MYSQL_TYPE_LONG;
|
||||
iparam[0].buffer = (void *) &serverid;
|
||||
iparam[1].buffer_type = MYSQL_TYPE_BLOB;
|
||||
iparam[2].buffer_type = MYSQL_TYPE_LONGLONG;
|
||||
iparam[2].buffer = (void *) &binlogpos;
|
||||
iparam[3].buffer_type = MYSQL_TYPE_SHORT;
|
||||
iparam[3].buffer = (void *) >idknown;
|
||||
iparam[4].buffer_type = MYSQL_TYPE_LONG;
|
||||
iparam[4].buffer = (void *) &servertype;
|
||||
// Update
|
||||
uparam[0].buffer_type = MYSQL_TYPE_BLOB;
|
||||
uparam[1].buffer_type = MYSQL_TYPE_LONGLONG;
|
||||
uparam[1].buffer = (void *) &binlogpos;
|
||||
uparam[2].buffer_type = MYSQL_TYPE_SHORT;
|
||||
uparam[2].buffer = (void *) >idknown;
|
||||
uparam[3].buffer_type = MYSQL_TYPE_LONG;
|
||||
uparam[3].buffer = (void *) &serverid;
|
||||
// Result set for select
|
||||
result[0].buffer_type = MYSQL_TYPE_LONGLONG;
|
||||
result[0].buffer = &binlogpos;
|
||||
|
||||
|
||||
// Iterate through the data
|
||||
for(i = 0; i < tbrm_rows; i++) {
|
||||
// Start from Select, we need to know if the consistency
|
||||
// information for this table, server pair is already
|
||||
// in metadata or not.
|
||||
|
||||
gtid = (char *)tbrm_servers[i]->gtid;
|
||||
gtidknown = tbrm_servers[i]->gtid_known;
|
||||
serverid = tbrm_servers[i]->server_id;
|
||||
servertype = tbrm_servers[i]->server_type;
|
||||
|
||||
iparam[1].buffer = (void *) gtid;
|
||||
uparam[0].buffer = (void *) gtid;
|
||||
uparam[0].buffer_length = tbrm_servers[i]->gtid_len;
|
||||
iparam[1].buffer_length = tbrm_servers[i]->gtid_len;
|
||||
|
||||
// Bind param structure to statement
|
||||
if (mysql_stmt_bind_param(sstmt, sparam) != 0) {
|
||||
tbrm_stmt_error(sstmt, "Error: Could not bind select parameters", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Bind result structure to statement
|
||||
if (mysql_stmt_bind_result(sstmt, result) != 0) {
|
||||
tbrm_stmt_error(sstmt, "Error: Could not bind select return parameters", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Execute!!
|
||||
if (mysql_stmt_execute(sstmt) != 0) {
|
||||
tbrm_stmt_error(sstmt, "Error: Could not execute select statement", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Store result
|
||||
if (mysql_stmt_store_result(sstmt) != 0) {
|
||||
tbrm_stmt_error(sstmt, "Error: Could not buffer result set", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Fetch result
|
||||
myerrno = mysql_stmt_fetch(sstmt);
|
||||
if (myerrno != 0 && myerrno != MYSQL_NO_DATA) {
|
||||
tbrm_stmt_error(sstmt, "Error: Could not fetch result set", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// If fetch returned 0 rows, it means that this table, serverid
|
||||
// pair was found from metadata, we might need to update
|
||||
// the consistency information.
|
||||
if (myerrno == 0) {
|
||||
// We update the consistency if and only if the
|
||||
// binlog position for this table has changed
|
||||
if (binlogpos != tbrm_servers[i]->binlog_pos) {
|
||||
// Update the consistency information
|
||||
binlogpos = tbrm_servers[i]->binlog_pos;
|
||||
|
||||
// Bind param structure to statement
|
||||
if (mysql_stmt_bind_param(ustmt, uparam) != 0) {
|
||||
tbrm_stmt_error(ustmt, "Error: Could not bind update parameters", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
// Execute!!
|
||||
if (mysql_stmt_execute(ustmt) != 0) {
|
||||
tbrm_stmt_error(ustmt, "Error: Could not execute update statement", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
if (tbr_debug) {
|
||||
skygw_log_write_flush( LOGFILE_TRACE,
|
||||
(char *)"TRC Debug: Metadata state updated for %s in server %d is binlog_pos %lu gtid '%s'",
|
||||
dbtable, serverid, binlogpos, gtid);
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// Insert the consistency information
|
||||
binlogpos = tbrm_servers[i]->binlog_pos;
|
||||
// Bind param structure to statement
|
||||
if (mysql_stmt_bind_param(istmt, iparam) != 0) {
|
||||
tbrm_stmt_error(istmt, "Error: Could not bind insert parameters", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
// Execute!!
|
||||
if (mysql_stmt_execute(istmt) != 0) {
|
||||
tbrm_stmt_error(istmt, "Error: Could not execute insert statement", __FILE__, __LINE__);
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
if (tbr_debug) {
|
||||
skygw_log_write_flush( LOGFILE_TRACE,
|
||||
(char *)"TRC Debug: Metadata state inserted for %s in server %d is binlog_pos %lu gtid '%s'",
|
||||
dbtable, serverid, binlogpos, gtid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
error_exit:
|
||||
// Cleanup
|
||||
if (sstmt) {
|
||||
if (mysql_stmt_close(sstmt)) {
|
||||
tbrm_stmt_error(sstmt, "Error: Could not close select statement", __FILE__, __LINE__);
|
||||
}
|
||||
}
|
||||
|
||||
if (istmt) {
|
||||
if (mysql_stmt_close(istmt)) {
|
||||
tbrm_stmt_error(istmt, "Error: Could not close select statement", __FILE__, __LINE__);
|
||||
}
|
||||
}
|
||||
|
||||
if (ustmt) {
|
||||
if (mysql_stmt_close(ustmt)) {
|
||||
tbrm_stmt_error(ustmt, "Error: Could not close select statement", __FILE__, __LINE__);
|
||||
}
|
||||
}
|
||||
|
||||
if (con) {
|
||||
mysql_close(con);
|
||||
}
|
||||
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
} // table_replication_metadata
|
||||
|
||||
} // mysql
|
@ -1,126 +0,0 @@
|
||||
/*
|
||||
Copyright (C) 2013-2014, MariaDB Corporation Ab
|
||||
|
||||
|
||||
This file is distributed as part of the MariaDB Corporation MaxScale. It is free
|
||||
software: you can redistribute it and/or modify it under the terms of the
|
||||
GNU General Public License as published by the Free Software Foundation,
|
||||
version 2.
|
||||
|
||||
This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
details.
|
||||
|
||||
You should have received a copy of the GNU General Public License along with
|
||||
this program; if not, write to the Free Software Foundation, Inc., 51
|
||||
Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
|
||||
Author: Jan Lindström jan.lindstrom@mariadb.com
|
||||
Created: 15-07-2013
|
||||
Updated:
|
||||
*/
|
||||
|
||||
#ifndef TABLE_REPLICATION_METADATA_H
|
||||
#define TABLE_REPLICATION_METADATA_H
|
||||
|
||||
namespace mysql {
|
||||
|
||||
namespace table_replication_metadata {
|
||||
|
||||
|
||||
/* Structure definition for table replication consistency metadata */
|
||||
typedef struct {
|
||||
unsigned char* db_table; /* Fully qualified db.table name,
|
||||
primary key. */
|
||||
boost::uint32_t server_id; /* Server id */
|
||||
unsigned char* gtid; /* Global transaction id */
|
||||
boost::uint32_t gtid_len; /* Length of gtid */
|
||||
boost::uint64_t binlog_pos; /* Binlog position */
|
||||
bool gtid_known; /* Is gtid known ? */
|
||||
} tbr_metadata_t;
|
||||
|
||||
/* Structure definition for table replication server metadata */
|
||||
typedef struct {
|
||||
boost::uint32_t server_id; /* Server id, primary key*/
|
||||
boost::uint64_t binlog_pos; /* Last executed binlog position */
|
||||
unsigned char* gtid; /* Last executed global transaction
|
||||
id if known */
|
||||
boost::uint32_t gtid_len; /* Actual length of gtid */
|
||||
bool gtid_known; /* 1 if gtid known, 0 if not */
|
||||
boost::uint32_t server_type; /* server type */
|
||||
} tbr_server_t;
|
||||
|
||||
// Not really nice, but currently we support only these two
|
||||
// server types.
|
||||
enum trc_server_type { TRC_SERVER_TYPE_MARIADB = 1, TRC_SERVER_TYPE_MYSQL = 2 };
|
||||
|
||||
|
||||
/***********************************************************************//**
|
||||
Read table replication consistency metadata from the MySQL master server.
|
||||
This function assumes that necessary database and table are created.
|
||||
@return false if read failed, true if read succeeded */
|
||||
bool
|
||||
tbrm_read_consistency_metadata(
|
||||
/*===========================*/
|
||||
const char *master_host, /*!< in: Master hostname */
|
||||
const char *user, /*!< in: username */
|
||||
const char *passwd, /*!< in: password */
|
||||
unsigned int master_port, /*!< in: master port */
|
||||
tbr_metadata_t **tbrm_meta, /*!< out: table replication consistency
|
||||
metadata. */
|
||||
size_t *tbrm_rows); /*!< out: number of rows read */
|
||||
|
||||
/***********************************************************************//**
|
||||
Read table replication server metadata from the MySQL master server.
|
||||
This function assumes that necessary database and table are created.
|
||||
@return false if read failed, true if read succeeded */
|
||||
bool
|
||||
tbrm_read_server_metadata(
|
||||
/*======================*/
|
||||
const char *master_host, /*!< in: Master hostname */
|
||||
const char *user, /*!< in: username */
|
||||
const char *passwd, /*!< in: password */
|
||||
unsigned int master_port, /*!< in: master port */
|
||||
tbr_server_t **tbrm_server, /*!< out: table replication server
|
||||
metadata. */
|
||||
size_t *tbrm_rows); /*!< out: number of rows read */
|
||||
|
||||
/***********************************************************************//**
|
||||
Write table replication consistency metadata from the MySQL master server.
|
||||
This function assumes that necessary database and table are created.
|
||||
@return false if read failed, true if read succeeded */
|
||||
bool
|
||||
tbrm_write_consistency_metadata(
|
||||
/*============================*/
|
||||
const char *master_host, /*!< in: Master hostname */
|
||||
const char *user, /*!< in: username */
|
||||
const char *passwd, /*!< in: password */
|
||||
unsigned int master_port, /*!< in: master port */
|
||||
tbr_metadata_t **tbrm_meta, /*!< in: table replication consistency
|
||||
metadata. */
|
||||
size_t tbrm_rows); /*!< in: number of rows read */
|
||||
|
||||
/***********************************************************************//**
|
||||
Write table replication server metadata from the MySQL master server.
|
||||
This function assumes that necessary database and table are created.
|
||||
@return false if read failed, true if read succeeded */
|
||||
bool
|
||||
tbrm_write_server_metadata(
|
||||
/*=======================*/
|
||||
const char *master_host, /*!< in: Master hostname */
|
||||
const char *user, /*!< in: username */
|
||||
const char *passwd, /*!< in: password */
|
||||
unsigned int master_port, /*!< in: master port */
|
||||
tbr_server_t **tbrm_server, /*!< out: table replication server
|
||||
metadata. */
|
||||
size_t tbrm_rows); /*!< out: number of rows read */
|
||||
|
||||
|
||||
} // table_replication_metadata
|
||||
|
||||
} // mysql
|
||||
|
||||
#endif
|
||||
|
||||
|
@ -1,517 +0,0 @@
|
||||
/*
|
||||
Copyright (C) 2013, MariaDB Corporation Ab
|
||||
|
||||
|
||||
This file is distributed as part of the MariaDB Corporation MaxScale. It is free
|
||||
software: you can redistribute it and/or modify it under the terms of the
|
||||
GNU General Public License as published by the Free Software Foundation,
|
||||
version 2.
|
||||
|
||||
This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
details.
|
||||
|
||||
You should have received a copy of the GNU General Public License along with
|
||||
this program; if not, write to the Free Software Foundation, Inc., 51
|
||||
Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
|
||||
Author: Jan Lindström jan.lindstrom@mariadb.com
|
||||
|
||||
Created: 20-06-2013
|
||||
Updated:
|
||||
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <ctype.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
#include "table_replication_parser.h"
|
||||
#include "table_replication_consistency.h"
|
||||
#include "log_manager.h"
|
||||
|
||||
namespace mysql {
|
||||
|
||||
namespace table_replication_parser {
|
||||
|
||||
typedef struct {
|
||||
char* m_start;
|
||||
char* m_pos;
|
||||
} tb_parser_t;
|
||||
|
||||
/***********************************************************************//**
|
||||
This internal function initializes internal parser data structure based on
|
||||
string to be parsed.*/
|
||||
static void
|
||||
tbr_parser_init(
|
||||
/*============*/
|
||||
tb_parser_t* m, /*!< inout: Parser structure to initialize */
|
||||
const char* s) /*!< in: String to parse */
|
||||
{
|
||||
m->m_start = (char *)s;
|
||||
m->m_pos = (char *)s;
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
This internal function skips all space characters on front
|
||||
@return position on string with next non space character*/
|
||||
static char*
|
||||
tbr_parser_skipwspc(
|
||||
char* str) /*!< in string */
|
||||
{
|
||||
while (isspace(*str)) {
|
||||
str++;
|
||||
}
|
||||
return(str);
|
||||
}
|
||||
|
||||
|
||||
/***********************************************************************//**
|
||||
This internal function parses input string and tries to match it to the given keyword.
|
||||
@return true if next keyword matches, false if not
|
||||
*/
|
||||
static bool
|
||||
tbr_match_keyword(
|
||||
/*==============*/
|
||||
tb_parser_t* m, /*!< inout: Parser structure */
|
||||
const char* const_str) /*!< in: Keyword to match */
|
||||
{
|
||||
size_t len;
|
||||
|
||||
m->m_pos = tbr_parser_skipwspc(m->m_pos);
|
||||
|
||||
if (const_str[0] == '\0') {
|
||||
return(m->m_pos[0] == '\0');
|
||||
}
|
||||
|
||||
len = strlen(const_str);
|
||||
|
||||
// Parsing is based on comparing two srings ignoring case
|
||||
if (strncasecmp(m->m_pos, const_str, len) == 0) {
|
||||
unsigned char c = (unsigned char)m->m_pos[len];
|
||||
if (isascii(c)) {
|
||||
if (isalnum(c)) {
|
||||
return (true);
|
||||
}
|
||||
if (c == '_') {
|
||||
return (false);
|
||||
}
|
||||
}
|
||||
m->m_pos += len;
|
||||
return(true);
|
||||
}
|
||||
return(false);
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
Internal function to parse next quoted string
|
||||
@return true if quoted string found, false if not
|
||||
*/
|
||||
static bool
|
||||
tbr_get_quoted(
|
||||
/*===========*/
|
||||
tb_parser_t* m, /*!< inout: Parser structure */
|
||||
char* buf, /*!< out: parsed string */
|
||||
unsigned int size,/*!< in: buffer size */
|
||||
bool keep_quotes) /*!< in: is quotes left on string */
|
||||
{
|
||||
char quote;
|
||||
tb_parser_t saved_m;
|
||||
|
||||
m->m_pos = tbr_parser_skipwspc(m->m_pos);
|
||||
|
||||
saved_m = *m;
|
||||
|
||||
quote = *m->m_pos++;
|
||||
|
||||
if (keep_quotes) {
|
||||
*buf++ = quote;
|
||||
size--;
|
||||
}
|
||||
|
||||
while (*m->m_pos != '\0') {
|
||||
if (*m->m_pos == quote) {
|
||||
if ((m->m_pos)[1] == quote) {
|
||||
m->m_pos++;
|
||||
|
||||
if (keep_quotes) {
|
||||
*buf++ = quote;
|
||||
|
||||
if (size-- <= 1) {
|
||||
*m = saved_m;
|
||||
return(false);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
*buf++ = *m->m_pos++;
|
||||
|
||||
if (size-- <= 1) {
|
||||
*m = saved_m;
|
||||
return(true);
|
||||
}
|
||||
}
|
||||
|
||||
if (*m->m_pos != quote) {
|
||||
*m = saved_m;
|
||||
return(false);
|
||||
}
|
||||
|
||||
m->m_pos++;
|
||||
|
||||
if (keep_quotes) {
|
||||
*buf++ = quote;
|
||||
|
||||
if (size-- <= 1) {
|
||||
*m = saved_m;
|
||||
return(true);
|
||||
}
|
||||
}
|
||||
|
||||
*buf = '\0';
|
||||
|
||||
return(true);
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
This internal function parses identifiers e.g. table name
|
||||
@return true if identifier is found, false if not
|
||||
*/
|
||||
static bool
|
||||
tbr_get_id(
|
||||
/*=======*/
|
||||
tb_parser_t* m, /*!< intout: Parser structure */
|
||||
char* id_buf, /*!< out: parsed identifier */
|
||||
unsigned int id_size) /*!< in: identifier size */
|
||||
{
|
||||
char* org_id_buf = id_buf;
|
||||
tb_parser_t saved_m;
|
||||
|
||||
m->m_pos = tbr_parser_skipwspc(m->m_pos);
|
||||
saved_m = *m;
|
||||
|
||||
if (*m->m_pos == '"' || *m->m_pos == '`') {
|
||||
if (!tbr_get_quoted(m, id_buf, id_size, false)) {
|
||||
*m = saved_m;
|
||||
return(false);
|
||||
}
|
||||
} else {
|
||||
while (isalnum(*m->m_pos) || *m->m_pos == '_') {
|
||||
*id_buf++ = *m->m_pos++;
|
||||
|
||||
if (id_size-- <= 1) {
|
||||
*m = saved_m;
|
||||
return(true);
|
||||
}
|
||||
}
|
||||
*id_buf = '\0';
|
||||
}
|
||||
|
||||
if (strlen(org_id_buf) > 0) {
|
||||
return(true);
|
||||
} else {
|
||||
*m = saved_m;
|
||||
return(false);
|
||||
}
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
This internal function parses constants e.g. "."
|
||||
@return true if constant is found, false if not
|
||||
*/
|
||||
static bool
|
||||
tbr_match_const(
|
||||
/*============*/
|
||||
tb_parser_t* m, /*!< inout: Parser structure */
|
||||
const char* const_str) /*!< in: constant to be parsed */
|
||||
{
|
||||
size_t len;
|
||||
|
||||
m->m_pos = tbr_parser_skipwspc(m->m_pos);
|
||||
|
||||
if (const_str[0] == '\0') {
|
||||
return(m->m_pos[0] == '\0');
|
||||
}
|
||||
|
||||
len = strlen(const_str);
|
||||
|
||||
if (strncasecmp(m->m_pos, const_str, len) == 0) {
|
||||
m->m_pos += len;
|
||||
return(true);
|
||||
} else {
|
||||
return(false);
|
||||
}
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
This internal function skips to position where given keyword is found
|
||||
@return true if keyword is found, false if not
|
||||
*/
|
||||
static bool
|
||||
tbr_skipto_keyword(
|
||||
/*===============*/
|
||||
tb_parser_t* m, /*!< inout: Parser structure */
|
||||
const char* const_str,/*!< in: keyword to find*/
|
||||
const char* end_str) /*!< in: stop at this keyword */
|
||||
{
|
||||
size_t len;
|
||||
bool more = true;
|
||||
|
||||
m->m_pos = tbr_parser_skipwspc(m->m_pos);
|
||||
|
||||
if (const_str[0] == '\0') {
|
||||
return(m->m_pos[0] == '\0');
|
||||
}
|
||||
|
||||
len = strlen(const_str);
|
||||
|
||||
while (more) {
|
||||
if (strncasecmp(m->m_pos, const_str, len) == 0) {
|
||||
m->m_pos += len;
|
||||
return(true);
|
||||
} else {
|
||||
if(!(tbr_match_const(m, (char *)end_str))) {
|
||||
m->m_pos++;
|
||||
|
||||
if (*(m->m_pos) == '\0'){
|
||||
return (false);
|
||||
}
|
||||
} else {
|
||||
m->m_pos-=strlen(end_str);
|
||||
return (false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return(true);
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
This internal function parses table name consisting database + "." + table
|
||||
@return true if table name is found, false if not
|
||||
*/
|
||||
static bool
|
||||
tbr_get_tablename(
|
||||
/*==============*/
|
||||
tb_parser_t* m, /*!< inout: Parser structure */
|
||||
char* dbname_buf, /*!< out: Database name or empty string */
|
||||
size_t dbname_size, /*!< in: size of db buffer */
|
||||
char* tabname_buf, /*!< out: Tablename or empty string */
|
||||
size_t tabname_size) /*!< in: size of tablename buffer */
|
||||
{
|
||||
tb_parser_t saved_m;
|
||||
|
||||
saved_m = *m;
|
||||
|
||||
/* Try to parse database name */
|
||||
if (!tbr_get_id(m, dbname_buf, dbname_size)) {
|
||||
return(false);
|
||||
}
|
||||
|
||||
/* If string does not contain constant "." there is no database name */
|
||||
if (!tbr_match_const(m, (char *)".")) {
|
||||
*m = saved_m;
|
||||
dbname_buf[0] = '\0';
|
||||
|
||||
if (!tbr_get_id(m, tabname_buf, tabname_size)) {
|
||||
return(false);
|
||||
}
|
||||
return(true);
|
||||
}
|
||||
|
||||
/* Try to parser table name */
|
||||
if (!tbr_get_id(m, tabname_buf, tabname_size)) {
|
||||
return(false);
|
||||
}
|
||||
|
||||
return(true);
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
This function parses SQL-clauses and extracts table names
|
||||
from the clause.
|
||||
@return true if table names found, false if not
|
||||
*/
|
||||
bool
|
||||
tbr_parser_table_names(
|
||||
/*===================*/
|
||||
char **db_name, /*!< inout: Array of db names */
|
||||
char **table_name, /*!< inout: Array of table names */
|
||||
int *n_tables, /*!< out: Number of db.table names found */
|
||||
const char* sql_string) /*!< in: SQL-clause */
|
||||
{
|
||||
tb_parser_t m;
|
||||
size_t name_count=0;
|
||||
char *dbname=NULL;
|
||||
char *tbname=NULL;
|
||||
size_t len = strlen(sql_string);
|
||||
|
||||
tbr_parser_init(&m, sql_string);
|
||||
*n_tables = 0;
|
||||
|
||||
// MySQL does not support multi-table insert or replace
|
||||
if ((tbr_match_keyword(&m, "INSERT") || tbr_match_keyword(&m, "REPLACE")) &&
|
||||
tbr_skipto_keyword(&m, "INTO", "")) {
|
||||
dbname = (char *)malloc(len+1);
|
||||
tbname = (char *)malloc(len+1);
|
||||
|
||||
if (tbr_get_tablename(&m, dbname, len, tbname, len)) {
|
||||
db_name[name_count] = dbname;
|
||||
table_name[name_count] = tbname;
|
||||
name_count++;
|
||||
|
||||
if (tbr_debug) {
|
||||
skygw_log_write_flush( LOGFILE_TRACE,
|
||||
(char *)"TRC Debug: INSERT OR REPLACE to %s.%s",
|
||||
dbname, tbname);
|
||||
}
|
||||
} else {
|
||||
free(dbname);
|
||||
free(tbname);
|
||||
return (false); // Parse error
|
||||
}
|
||||
}
|
||||
// MySQL does support multi table delete/update
|
||||
if ((tbr_match_keyword(&m, "DELETE") &&
|
||||
tbr_skipto_keyword(&m, "FROM","")) ||
|
||||
(tbr_match_keyword(&m, "UPDATE"))) {
|
||||
dbname = (char *)malloc(len+1);
|
||||
tbname = (char *)malloc(len+1);
|
||||
|
||||
// These will eat the optional keywords from update
|
||||
tbr_match_keyword(&m, "LOW PRIORITY");
|
||||
tbr_match_keyword(&m, "IGNORE");
|
||||
|
||||
// Parse the first db.table name
|
||||
if (tbr_get_tablename(&m, dbname, len,tbname,len)) {
|
||||
db_name[name_count] = dbname;
|
||||
table_name[name_count] = tbname;
|
||||
name_count++;
|
||||
|
||||
// Table names are delimited by ","
|
||||
while(tbr_match_const(&m, ",")) {
|
||||
dbname = (char *)malloc(len+1);
|
||||
tbname = (char *)malloc(len+1);
|
||||
// Parse the next db.table name
|
||||
if (tbr_get_tablename(&m, dbname, len,tbname,len)) {
|
||||
db_name[name_count] = dbname;
|
||||
table_name[name_count] = tbname;
|
||||
name_count++;
|
||||
|
||||
if (tbr_debug) {
|
||||
skygw_log_write_flush( LOGFILE_TRACE,
|
||||
(char *)"TRC Debug: DELETE OR UPDATE to %s.%s",
|
||||
dbname, tbname);
|
||||
}
|
||||
} else {
|
||||
free(dbname);
|
||||
free(tbname);
|
||||
return (false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// LOAD command
|
||||
if (tbr_match_keyword(&m, "LOAD") &&
|
||||
tbr_skipto_keyword(&m, "INTO", "")) {
|
||||
|
||||
// Eat TABLE keyword
|
||||
tbr_match_keyword(&m, "TABLE");
|
||||
|
||||
dbname = (char *)malloc(len+1);
|
||||
tbname = (char *)malloc(len+1);
|
||||
|
||||
if (tbr_get_tablename(&m, dbname, len, tbname, len)) {
|
||||
db_name[name_count] = dbname;
|
||||
table_name[name_count] = tbname;
|
||||
name_count++;
|
||||
|
||||
if (tbr_debug) {
|
||||
skygw_log_write_flush( LOGFILE_TRACE,
|
||||
(char *)"TRC Debug: LOAD to %s.%s",
|
||||
dbname, tbname);
|
||||
}
|
||||
} else {
|
||||
free(dbname);
|
||||
free(tbname);
|
||||
return (false); // Parse error
|
||||
}
|
||||
}
|
||||
|
||||
// Create/Drop table
|
||||
if (tbr_match_keyword(&m, "CREATE") &&
|
||||
tbr_skipto_keyword(&m, "DROP", "")) {
|
||||
|
||||
// Eat TEMPORARY keyword
|
||||
tbr_match_keyword(&m, "TEMPORARY");
|
||||
|
||||
// Eat IF NOT EXISTS
|
||||
tbr_match_keyword(&m, "IF NOT EXISTS");
|
||||
|
||||
// Eat IF EXISTS
|
||||
tbr_match_keyword(&m, "IF EXISTS");
|
||||
|
||||
// Eat TABLE keyword
|
||||
tbr_match_keyword(&m, "TABLE");
|
||||
|
||||
dbname = (char *)malloc(len+1);
|
||||
tbname = (char *)malloc(len+1);
|
||||
|
||||
if (tbr_get_tablename(&m, dbname, len, tbname, len)) {
|
||||
db_name[name_count] = dbname;
|
||||
table_name[name_count] = tbname;
|
||||
name_count++;
|
||||
|
||||
if (tbr_debug) {
|
||||
// Table names are delimited by ","
|
||||
while(tbr_match_const(&m, ",")) {
|
||||
dbname = (char *)malloc(len+1);
|
||||
tbname = (char *)malloc(len+1);
|
||||
// Parse the next db.table name
|
||||
if (tbr_get_tablename(&m, dbname, len,tbname,len)) {
|
||||
db_name[name_count] = dbname;
|
||||
table_name[name_count] = tbname;
|
||||
name_count++;
|
||||
|
||||
if (tbr_debug) {
|
||||
skygw_log_write_flush( LOGFILE_TRACE,
|
||||
(char *)"TRC Debug: DROP TABLE to %s.%s",
|
||||
dbname, tbname);
|
||||
}
|
||||
} else {
|
||||
free(dbname);
|
||||
free(tbname);
|
||||
return (false);
|
||||
}
|
||||
}
|
||||
skygw_log_write_flush( LOGFILE_TRACE,
|
||||
(char *)"TRC Debug: CREATE/DROP TABLE to %s.%s",
|
||||
dbname, tbname);
|
||||
}
|
||||
} else {
|
||||
free(dbname);
|
||||
free(tbname);
|
||||
return (false); // Parse error
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
*n_tables = name_count;
|
||||
|
||||
if (name_count == 0) {
|
||||
return (false); // Parse error
|
||||
}
|
||||
|
||||
return (true);
|
||||
}
|
||||
|
||||
} // table_replication_parser
|
||||
|
||||
} // mysql
|
||||
|
@ -1,51 +0,0 @@
|
||||
/*
|
||||
Copyright (C) 2013-2014, MariaDB Corporation Ab
|
||||
|
||||
|
||||
This file is distributed as part of the MariaDB Corporation MaxScale. It is free
|
||||
software: you can redistribute it and/or modify it under the terms of the
|
||||
GNU General Public License as published by the Free Software Foundation,
|
||||
version 2.
|
||||
|
||||
This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
details.
|
||||
|
||||
You should have received a copy of the GNU General Public License along with
|
||||
this program; if not, write to the Free Software Foundation, Inc., 51
|
||||
Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
|
||||
Author: Jan Lindström jan.lindstrom@mariadb.com
|
||||
|
||||
Created: 20-06-2013
|
||||
Updated:
|
||||
|
||||
*/
|
||||
|
||||
#ifndef TABLE_REPLICATION_PARSER_H
|
||||
#define TABLE_REPLICATION_PARSER_H
|
||||
|
||||
namespace mysql {
|
||||
|
||||
namespace table_replication_parser {
|
||||
|
||||
/***********************************************************************//**
|
||||
This function parses SQL-clauses and extracts table names
|
||||
from the clause.
|
||||
@return true if table names found, false if not
|
||||
*/
|
||||
bool
|
||||
tbr_parser_table_names(
|
||||
/*===================*/
|
||||
char **db_name, /*!< inout: Array of db names */
|
||||
char **table_name, /*!< inout: Array of table names */
|
||||
int *n_tables, /*!< out: Number of db.table names found */
|
||||
const char* sql_string); /*!< in: SQL-clause */
|
||||
|
||||
} // table_replication_parser
|
||||
|
||||
} // mysql
|
||||
|
||||
#endif
|
||||
|
@ -1,35 +0,0 @@
|
||||
project (test)
|
||||
cmake_minimum_required (VERSION 2.6)
|
||||
|
||||
# --------- Find crypt
|
||||
FIND_LIBRARY(CRYPTO NAMES libcrypto.a /opt/local/lib /opt/lib /usr/lib /usr/local/lib /usr/local/ssl/lib)
|
||||
FIND_LIBRARY(SSL NAMES libssl.a /opt/local/lib /opt/lib /usr/lib /usr/local/lib /usr/local/ssl/lib)
|
||||
FIND_LIBRARY(REPLICATION replication /opt/local/lib /opt/lib /usr/lib /usr/local/lib ../)
|
||||
|
||||
LINK_DIRECTORIES(${Boost_LIBRARY_DIRS})
|
||||
INCLUDE_DIRECTORIES(${Boost_INCLUDE_DIR})
|
||||
|
||||
# Find MySQL client library and header files
|
||||
find_path(MySQL_INCLUDE_DIR mysql.h
|
||||
/usr/local/include/mysql /usr/include/mysql /usr/local/mysql/include)
|
||||
include_directories(${MySQL_INCLUDE_DIR})
|
||||
|
||||
#MariaDB Corporation
|
||||
find_path(MariaDB_Corporation_INCLUDE_DIR skygw_debug.h
|
||||
/usr/local/include /usr/include ../../utils)
|
||||
include_directories(${MariaDB_Corporation_INCLUDE_DIR})
|
||||
|
||||
find_path(TRC_INCLUDE_DIR table_replication_consistency.h
|
||||
../ /usr/include /usr/local/include)
|
||||
include_directories(${TRC_INCLUDE_DIR})
|
||||
|
||||
# Build rule for example
|
||||
foreach(prog Example)
|
||||
ADD_EXECUTABLE(${prog} ${prog}.c ../../utils/skygw_utils.o /usr/local/mysql/lib/libmysqld.a)
|
||||
TARGET_LINK_LIBRARIES(${prog} table_replication_consistency.a replication boost_system boost_thread pthread stdc++ ${SSL} ${CRYPTO} crypt z dl aio log_manager)
|
||||
endforeach()
|
||||
|
||||
foreach(prog test)
|
||||
ADD_EXECUTABLE(${prog} ${prog}.cpp ../../utils/skygw_utils.o /usr/local/mysql/lib/libmysqld.a)
|
||||
TARGET_LINK_LIBRARIES(${prog} table_replication_consistency.a replication boost_system boost_thread pthread stdc++ ${SSL} ${CRYPTO} crypt z dl aio log_manager)
|
||||
endforeach()
|
@ -1,89 +0,0 @@
|
||||
#include <getopt.h>
|
||||
#include <stdlib.h>
|
||||
#include <errno.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <mysql.h>
|
||||
#ifndef bool
|
||||
#define bool int
|
||||
#endif
|
||||
#include "table_replication_consistency.h"
|
||||
|
||||
static char* server_options[] = {
|
||||
"jan test",
|
||||
"--datadir=/tmp/",
|
||||
"--skip-innodb",
|
||||
"--default-storage-engine=myisam",
|
||||
NULL
|
||||
};
|
||||
|
||||
const int num_elements = (sizeof(server_options) / sizeof(char *)) - 1;
|
||||
|
||||
static char* server_groups[] = {
|
||||
"embedded",
|
||||
"server",
|
||||
"server",
|
||||
"server",
|
||||
NULL
|
||||
};
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
|
||||
int i=0,k=0;
|
||||
char *uri;
|
||||
replication_listener_t *mrl;
|
||||
int err=0;
|
||||
char *errstr=NULL;
|
||||
|
||||
// This will initialize MySQL
|
||||
if (mysql_server_init(num_elements, server_options, server_groups)) {
|
||||
printf("MySQL server init failed\n");
|
||||
exit(2);
|
||||
}
|
||||
|
||||
|
||||
mrl = (replication_listener_t*)calloc(argc, sizeof(replication_listener_t));
|
||||
|
||||
if (argc < 2) {
|
||||
printf("Usage: Example <uri> [<uri> ...]\n");
|
||||
exit(2);
|
||||
}
|
||||
|
||||
for(i=0; i < argc; i++) {
|
||||
uri= argv[i];
|
||||
|
||||
if ( strncmp("mysql://", uri, 8) == 0) {
|
||||
|
||||
mrl[k].server_url = malloc(strlen(uri)+1);
|
||||
strcpy(mrl[k].server_url, uri);
|
||||
k++;
|
||||
|
||||
if (argc == 1) {
|
||||
mrl[i].is_master = 1;
|
||||
}
|
||||
|
||||
}
|
||||
}//end of outer while loop
|
||||
|
||||
err = tb_replication_consistency_init(mrl, k, 5, TBR_TRACE_DEBUG);
|
||||
|
||||
if (err ) {
|
||||
perror(NULL);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
for(;;) {
|
||||
sleep(3);
|
||||
}
|
||||
|
||||
err = tb_replication_consistency_shutdown(&errstr);
|
||||
|
||||
if (*errstr) {
|
||||
fprintf(stderr, "%s\n", errstr);
|
||||
free(errstr);
|
||||
}
|
||||
|
||||
exit(0);
|
||||
|
||||
}
|
@ -1,44 +0,0 @@
|
||||
# Install script for directory: /home/jan/skysql/maxscale/table_replication_consistency/test
|
||||
|
||||
# Set the install prefix
|
||||
IF(NOT DEFINED CMAKE_INSTALL_PREFIX)
|
||||
SET(CMAKE_INSTALL_PREFIX "/usr/local")
|
||||
ENDIF(NOT DEFINED CMAKE_INSTALL_PREFIX)
|
||||
STRING(REGEX REPLACE "/$" "" CMAKE_INSTALL_PREFIX "${CMAKE_INSTALL_PREFIX}")
|
||||
|
||||
# Set the install configuration name.
|
||||
IF(NOT DEFINED CMAKE_INSTALL_CONFIG_NAME)
|
||||
IF(BUILD_TYPE)
|
||||
STRING(REGEX REPLACE "^[^A-Za-z0-9_]+" ""
|
||||
CMAKE_INSTALL_CONFIG_NAME "${BUILD_TYPE}")
|
||||
ELSE(BUILD_TYPE)
|
||||
SET(CMAKE_INSTALL_CONFIG_NAME "")
|
||||
ENDIF(BUILD_TYPE)
|
||||
MESSAGE(STATUS "Install configuration: \"${CMAKE_INSTALL_CONFIG_NAME}\"")
|
||||
ENDIF(NOT DEFINED CMAKE_INSTALL_CONFIG_NAME)
|
||||
|
||||
# Set the component getting installed.
|
||||
IF(NOT CMAKE_INSTALL_COMPONENT)
|
||||
IF(COMPONENT)
|
||||
MESSAGE(STATUS "Install component: \"${COMPONENT}\"")
|
||||
SET(CMAKE_INSTALL_COMPONENT "${COMPONENT}")
|
||||
ELSE(COMPONENT)
|
||||
SET(CMAKE_INSTALL_COMPONENT)
|
||||
ENDIF(COMPONENT)
|
||||
ENDIF(NOT CMAKE_INSTALL_COMPONENT)
|
||||
|
||||
# Install shared libraries without execute permission?
|
||||
IF(NOT DEFINED CMAKE_INSTALL_SO_NO_EXE)
|
||||
SET(CMAKE_INSTALL_SO_NO_EXE "1")
|
||||
ENDIF(NOT DEFINED CMAKE_INSTALL_SO_NO_EXE)
|
||||
|
||||
IF(CMAKE_INSTALL_COMPONENT)
|
||||
SET(CMAKE_INSTALL_MANIFEST "install_manifest_${CMAKE_INSTALL_COMPONENT}.txt")
|
||||
ELSE(CMAKE_INSTALL_COMPONENT)
|
||||
SET(CMAKE_INSTALL_MANIFEST "install_manifest.txt")
|
||||
ENDIF(CMAKE_INSTALL_COMPONENT)
|
||||
|
||||
FILE(WRITE "/home/jan/skysql/maxscale/table_replication_consistency/test/${CMAKE_INSTALL_MANIFEST}" "")
|
||||
FOREACH(file ${CMAKE_INSTALL_MANIFEST_FILES})
|
||||
FILE(APPEND "/home/jan/skysql/maxscale/table_replication_consistency/test/${CMAKE_INSTALL_MANIFEST}" "${file}\n")
|
||||
ENDFOREACH(file)
|
@ -1,98 +0,0 @@
|
||||
#include <getopt.h>
|
||||
#include <stdlib.h>
|
||||
#include <errno.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <mysql.h>
|
||||
#ifndef bool
|
||||
#define bool int
|
||||
#endif
|
||||
#include "table_replication_consistency.h"
|
||||
#include "../log_manager/log_manager.h"
|
||||
|
||||
static char* server_options[] = {
|
||||
(char *)"jtest",
|
||||
(char *)"--datadir=/tmp",
|
||||
(char *)"--skip-innodb",
|
||||
(char *)"--default-storage-engine=myisam",
|
||||
NULL
|
||||
};
|
||||
|
||||
const int num_elements = (sizeof(server_options) / sizeof(char *)) - 1;
|
||||
|
||||
static char* server_groups[] = { (char *)"libmysqld_server",
|
||||
(char *)"libmysqld_client",
|
||||
(char *)"libmysqld_server",
|
||||
(char *)"libmysqld_server", NULL };
|
||||
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
|
||||
int i=0,k=0;
|
||||
char *uri;
|
||||
replication_listener_t *mrl;
|
||||
int err=0;
|
||||
char *errstr=NULL;
|
||||
|
||||
// This will initialize MySQL
|
||||
if (mysql_library_init(num_elements, server_options, server_groups)) {
|
||||
printf("MySQL server init failed\n");
|
||||
exit(2);
|
||||
}
|
||||
|
||||
|
||||
mrl = (replication_listener_t*)calloc(argc, sizeof(replication_listener_t));
|
||||
|
||||
if (argc < 2) {
|
||||
printf("Usage: Example <uri> [<uri> ...]\n");
|
||||
exit(2);
|
||||
}
|
||||
|
||||
for(i=0; i < argc; i++) {
|
||||
uri= argv[i];
|
||||
|
||||
if ( strncmp("mysql://", uri, 8) == 0) {
|
||||
|
||||
mrl[k].server_url = (char *)malloc(strlen(uri)+1);
|
||||
strcpy(mrl[k].server_url, uri);
|
||||
|
||||
if (k == 0) {
|
||||
mrl[k].is_master = 1;
|
||||
}
|
||||
k++;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
const char *opts[] = {
|
||||
(char *)"test",
|
||||
(char *)"-g",
|
||||
(char *)"/home/jan/",
|
||||
NULL
|
||||
};
|
||||
|
||||
skygw_logmanager_init(3, (char **)&opts);
|
||||
|
||||
err = tb_replication_consistency_init(mrl, k, 5, TBR_TRACE_DEBUG);
|
||||
|
||||
if (err ) {
|
||||
perror(NULL);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// This will allow the server to start
|
||||
for(;;) {
|
||||
sleep(10);
|
||||
}
|
||||
|
||||
err = tb_replication_consistency_shutdown(&errstr);
|
||||
|
||||
if (*errstr) {
|
||||
fprintf(stderr, "%s\n", errstr);
|
||||
free(errstr);
|
||||
}
|
||||
|
||||
exit(0);
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user