427 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			427 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
Copyright (C) 2013, MariaDB Corporation Ab
 | 
						|
 | 
						|
 | 
						|
This file is distributed as part of the MariaDB Corporation MaxScale. It is free
 | 
						|
software: you can redistribute it and/or modify it under the terms of the
 | 
						|
GNU General Public License as published by the Free Software Foundation,
 | 
						|
version 2.
 | 
						|
 | 
						|
This program is distributed in the hope that it will be useful, but WITHOUT
 | 
						|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 | 
						|
FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 | 
						|
details.
 | 
						|
 | 
						|
You should have received a copy of the GNU General Public License along with
 | 
						|
this program; if not, write to the Free Software Foundation, Inc., 51
 | 
						|
Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 | 
						|
 | 
						|
Author: Jan Lindström jan.lindstrom@skysql.com
 | 
						|
 | 
						|
Created: 20-06-2013
 | 
						|
Updated:
 | 
						|
 | 
						|
*/
 | 
						|
#include <iostream>
 | 
						|
#include <stdlib.h>
 | 
						|
#include <errno.h>
 | 
						|
#include <string.h>
 | 
						|
#include <regex.h>
 | 
						|
#include <algorithm>
 | 
						|
#include <sstream>
 | 
						|
#include <boost/system/system_error.hpp>
 | 
						|
#include "table_replication_consistency.h"
 | 
						|
#include "table_replication_listener.h"
 | 
						|
#include "table_replication_metadata.h"
 | 
						|
#include "listener_exception.h"
 | 
						|
#include "log_manager.h"
 | 
						|
 | 
						|
/* Global memory */
 | 
						|
 | 
						|
pthread_t *replication_listener_tid = NULL;
 | 
						|
pthread_t *replication_listener_metadata_tid=NULL;
 | 
						|
unsigned int n_replication_listeners = 0;
 | 
						|
 | 
						|
bool listener_shutdown=false;          /* This flag will be true
 | 
						|
				       at shutdown */
 | 
						|
 | 
						|
 | 
						|
#ifdef TBR_DEBUG
 | 
						|
bool tbr_trace = true;
 | 
						|
bool tbr_debug = true;
 | 
						|
#else
 | 
						|
#ifdef TBR_TRACE
 | 
						|
bool tbr_trace = true;
 | 
						|
#else
 | 
						|
bool tbr_trace = false;
 | 
						|
bool tbr_debug = false;
 | 
						|
#endif
 | 
						|
#endif
 | 
						|
 | 
						|
/* Namespaces */
 | 
						|
 | 
						|
using namespace std;
 | 
						|
using namespace mysql;
 | 
						|
using namespace table_replication_listener;
 | 
						|
using namespace table_replication_metadata;
 | 
						|
 | 
						|
/***********************************************************************//**
 | 
						|
This function will register replication listener for every server
 | 
						|
provided and initialize all internal data structures and starts listening
 | 
						|
the replication stream.
 | 
						|
@return 0 on success, error code at failure. */
 | 
						|
int
 | 
						|
tb_replication_consistency_init(
 | 
						|
/*============================*/
 | 
						|
	replication_listener_t *rpl,              /*!< in: Server
 | 
						|
						  definition. */
 | 
						|
	size_t                 n_servers,         /*!< in: Number of servers */
 | 
						|
	unsigned int           gateway_server_id, /*!< in: MaxScale slave
 | 
						|
						  server id. */
 | 
						|
	int                    trace_level)       /*!< in: Trace level */
 | 
						|
{
 | 
						|
	boost::uint32_t i;
 | 
						|
	int err = 0;
 | 
						|
	string errmsg="";
 | 
						|
 | 
						|
	// Allocate memory for thread identifiers
 | 
						|
	replication_listener_tid = (pthread_t*)malloc(sizeof(pthread_t) * (n_servers + 1));
 | 
						|
 | 
						|
	if (replication_listener_tid == NULL) {
 | 
						|
		errmsg = string("Fatal: Table_Replication_Consistency: out of memory");
 | 
						|
		goto error_handling;
 | 
						|
	}
 | 
						|
 | 
						|
	replication_listener_metadata_tid = (pthread_t*)malloc(sizeof(pthread_t));
 | 
						|
 | 
						|
	if (replication_listener_metadata_tid == NULL) {
 | 
						|
		free(replication_listener_tid);
 | 
						|
		errmsg = string("Fatal: Table_Replication_Consistency: out of memory");
 | 
						|
		goto error_handling;
 | 
						|
	}
 | 
						|
 | 
						|
	// Set up trace level
 | 
						|
	if (trace_level & TBR_TRACE_DEBUG) {
 | 
						|
		tbr_debug = true;
 | 
						|
	}
 | 
						|
 | 
						|
	if (trace_level & TBR_TRACE_TRACE) {
 | 
						|
		tbr_trace = true;
 | 
						|
	}
 | 
						|
 | 
						|
	// Find out the master server
 | 
						|
	for(i=0;i < n_servers; i++) {
 | 
						|
		if (rpl[i].is_master) {
 | 
						|
			break;
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// If master is found read metadata from MySQL server, if not report error
 | 
						|
	if (i < n_servers) {
 | 
						|
		char *errm = NULL;
 | 
						|
		if(!tb_replication_listener_init(&(rpl[i]), &errm)) {
 | 
						|
			errmsg = std::string(errm);
 | 
						|
			free(errm);
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		errmsg = string("Master server is missing from configuration");
 | 
						|
		goto error_handling;
 | 
						|
	}
 | 
						|
 | 
						|
	// Start replication stream reader thread for every server in the configuration
 | 
						|
	for(i=0;i < n_servers; i++) {
 | 
						|
		// We need to try catch all exceptions here because function
 | 
						|
		// calling this service could be implemented using C-language
 | 
						|
		// thus we need to protect it.
 | 
						|
		try {
 | 
						|
			rpl[i].gateway_slave_server_id = gateway_server_id;
 | 
						|
			rpl[i].listener_id = i;
 | 
						|
 | 
						|
			// For master we start also metadata updater
 | 
						|
			if (rpl[i].is_master) {
 | 
						|
				err = pthread_create(
 | 
						|
					replication_listener_metadata_tid,
 | 
						|
					NULL,
 | 
						|
					&(tb_replication_listener_metadata_updater),
 | 
						|
					(void *)&(rpl[i]));
 | 
						|
 | 
						|
				if (err) {
 | 
						|
					errmsg = string(strerror(err));
 | 
						|
					goto error_handling;
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			// Start actual replication listener
 | 
						|
			err = pthread_create(
 | 
						|
				&replication_listener_tid[i],
 | 
						|
				NULL,
 | 
						|
				&(tb_replication_listener_reader),
 | 
						|
				(void *) &(rpl[i]));
 | 
						|
 | 
						|
			if (err) {
 | 
						|
				errmsg = string(strerror(err));
 | 
						|
				goto error_handling;
 | 
						|
			}
 | 
						|
 | 
						|
		}
 | 
						|
		// Replication listener will use this exception for
 | 
						|
		// error handling.
 | 
						|
		catch(ListenerException const& e)
 | 
						|
		{
 | 
						|
			errmsg = e.what();
 | 
						|
			goto error_handling;
 | 
						|
		}
 | 
						|
		// Boost library exceptions
 | 
						|
		catch(boost::system::error_code const& e)
 | 
						|
		{
 | 
						|
			errmsg = e.message();
 | 
						|
			goto error_handling;
 | 
						|
		}
 | 
						|
		// Try and catch all exceptions
 | 
						|
		catch(std::exception const& e)
 | 
						|
		{
 | 
						|
			errmsg = e.what();
 | 
						|
			goto error_handling;
 | 
						|
		}
 | 
						|
		// Rest of them
 | 
						|
		catch(...)
 | 
						|
		{
 | 
						|
			errmsg = std::string("Unknown exception: ");
 | 
						|
			goto error_handling;
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Number of threads
 | 
						|
	n_replication_listeners = i;
 | 
						|
	/* We will try to join the threads at shutdown */
 | 
						|
	return (0);
 | 
						|
 | 
						|
 error_handling:
 | 
						|
	n_replication_listeners = i;
 | 
						|
	rpl[i].error_message = (char *)malloc(errmsg.size()+1);
 | 
						|
	strcpy(rpl[i].error_message, errmsg.c_str());
 | 
						|
 | 
						|
	// This will log error to log file
 | 
						|
	skygw_log_write_flush( LOGFILE_ERROR, (char *)errmsg.c_str());
 | 
						|
 | 
						|
	return (1);
 | 
						|
}
 | 
						|
 | 
						|
/***********************************************************************//**
 | 
						|
With this fuction client can request table consistency status for a
 | 
						|
single table. As a return client will receive a number of consistency
 | 
						|
status structures. Client must allocate memory for consistency result
 | 
						|
array and provide the maximum number of values returned. At return
 | 
						|
there is information how many results where available.
 | 
						|
@return 0 on success, error code at failure. */
 | 
						|
int
 | 
						|
tb_replication_consistency_query(
 | 
						|
/*=============================*/
 | 
						|
	table_consistency_query_t *tb_query, /*!< in: Table consistency
 | 
						|
					     query. */
 | 
						|
	table_consistency_t *tb_consistency, /*!< in: Table consistency
 | 
						|
					     status structure.*/
 | 
						|
	size_t *n_servers)                   /*!< inout: Number of
 | 
						|
					     servers where to get table
 | 
						|
					     consistency status. Out: Number
 | 
						|
					     of successfull consistency
 | 
						|
					     query results. */
 | 
						|
{
 | 
						|
	int err = 0;
 | 
						|
	boost::uint32_t i = 0;
 | 
						|
	std::string errmsg ="";
 | 
						|
 | 
						|
	// We need to protect C client from exceptions here
 | 
						|
	try {
 | 
						|
		for(i = 0; i < *n_servers; i++) {
 | 
						|
			err = tb_replication_listener_consistency((const unsigned char *)tb_query->db_dot_table, &tb_consistency[i], i);
 | 
						|
 | 
						|
			if (err) {
 | 
						|
				goto err_exit;
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	catch(mysql::ListenerException const& e)
 | 
						|
	{
 | 
						|
		errmsg = e.what();
 | 
						|
		goto error_handling;
 | 
						|
	}
 | 
						|
	catch(boost::system::error_code const& e)
 | 
						|
	{
 | 
						|
		errmsg = e.message();
 | 
						|
		goto error_handling;
 | 
						|
	}
 | 
						|
	// Try and catch all exceptions
 | 
						|
	catch(std::exception const& e)
 | 
						|
	{
 | 
						|
		errmsg = e.what();
 | 
						|
		goto error_handling;
 | 
						|
	}
 | 
						|
	// Rest of them
 | 
						|
	catch(...)
 | 
						|
	{
 | 
						|
		errmsg = std::string("Unknown exception: ");
 | 
						|
		goto error_handling;
 | 
						|
	}
 | 
						|
 | 
						|
	*n_servers = i;
 | 
						|
	return (err);
 | 
						|
 | 
						|
error_handling:
 | 
						|
	tb_consistency[i].error_message = (char *)malloc(errmsg.size()+1);
 | 
						|
	strcpy(tb_consistency[i].error_message, errmsg.c_str());
 | 
						|
	// This will log error to log file
 | 
						|
	skygw_log_write_flush( LOGFILE_ERROR, (char *)errmsg.c_str());
 | 
						|
 | 
						|
err_exit:
 | 
						|
	*n_servers=i-1;
 | 
						|
	tb_consistency[i].error_code = err;
 | 
						|
 | 
						|
	return (1);
 | 
						|
}
 | 
						|
 | 
						|
/***********************************************************************//**
 | 
						|
This function will reconnect replication listener to a server
 | 
						|
provided.
 | 
						|
@return 0 on success, error code at failure. */
 | 
						|
int
 | 
						|
tb_replication_consistency_reconnect(
 | 
						|
/*=================================*/
 | 
						|
	replication_listener_t* rpl,     /*!< in: Server definition.*/
 | 
						|
	unsigned int gateway_server_id)  /*!< in: MaxScale slave
 | 
						|
					 server id. */
 | 
						|
{
 | 
						|
	std::string errmsg ="";
 | 
						|
	int err = 0;
 | 
						|
 | 
						|
	rpl->gateway_slave_server_id = gateway_server_id;
 | 
						|
 | 
						|
	// We need to protect C client from exceptions here
 | 
						|
	try {
 | 
						|
		err = tb_replication_listener_reconnect(rpl, &replication_listener_tid[rpl->listener_id]);
 | 
						|
 | 
						|
		if (err) {
 | 
						|
			goto err_exit;
 | 
						|
		}
 | 
						|
	}
 | 
						|
	catch(ListenerException const& e)
 | 
						|
	{
 | 
						|
		errmsg = e.what();
 | 
						|
		goto error_handling;
 | 
						|
	}
 | 
						|
	catch(boost::system::error_code const& e)
 | 
						|
	{
 | 
						|
		errmsg = e.message();
 | 
						|
		goto error_handling;
 | 
						|
	}
 | 
						|
	// Try and catch all exceptions
 | 
						|
	catch(std::exception const& e)
 | 
						|
	{
 | 
						|
		errmsg = e.what();
 | 
						|
		goto error_handling;
 | 
						|
	}
 | 
						|
	// Rest of them
 | 
						|
	catch(...)
 | 
						|
	{
 | 
						|
		errmsg = std::string("Unknown exception: ");
 | 
						|
		goto error_handling;
 | 
						|
	}
 | 
						|
 | 
						|
	return (err);
 | 
						|
 | 
						|
error_handling:
 | 
						|
	rpl->error_message = (char *)malloc(errmsg.size()+1);
 | 
						|
	strcpy(rpl->error_message, errmsg.c_str());
 | 
						|
	// This will log error to log file
 | 
						|
	skygw_log_write_flush( LOGFILE_ERROR, (char *)errmsg.c_str());
 | 
						|
 | 
						|
err_exit:
 | 
						|
	return (1);
 | 
						|
}
 | 
						|
 | 
						|
/***********************************************************************//**
 | 
						|
This function is to shutdown the replication listener and free all
 | 
						|
resources on table consistency. This function will store
 | 
						|
the current status on metadata to MySQL server.
 | 
						|
@return 0 on success, error code at failure. */
 | 
						|
int
 | 
						|
tb_replication_consistency_shutdown(
 | 
						|
/*================================*/
 | 
						|
	char ** error_message)          /*!< out: error message */
 | 
						|
{
 | 
						|
	int err = 0;
 | 
						|
	boost::uint32_t i = 0;
 | 
						|
	std::string errmsg ="";
 | 
						|
 | 
						|
	// We need to protect C client from exceptions here
 | 
						|
	try {
 | 
						|
 | 
						|
		// Wait until all replication listeners are shut down
 | 
						|
		for(i = 0; i < n_replication_listeners; i++) {
 | 
						|
 | 
						|
			err = tb_replication_listener_shutdown(i, error_message);
 | 
						|
 | 
						|
			if (err) {
 | 
						|
				goto err_exit;
 | 
						|
			}
 | 
						|
 | 
						|
			// Need to wait until the thread exits
 | 
						|
			err = pthread_join(replication_listener_tid[i], (void **)error_message);
 | 
						|
 | 
						|
			if (err) {
 | 
						|
				goto err_exit;
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		listener_shutdown = true;
 | 
						|
 | 
						|
		// Wait until metadata writer has shut down
 | 
						|
		err = pthread_join(*replication_listener_metadata_tid, NULL);
 | 
						|
 | 
						|
		if (err) {
 | 
						|
			goto err_exit;
 | 
						|
		}
 | 
						|
 | 
						|
		// Write metadata to MySQL storage and clean up
 | 
						|
		err = tb_replication_listener_done(error_message);
 | 
						|
 | 
						|
		if (err) {
 | 
						|
			goto err_exit;
 | 
						|
		}
 | 
						|
	}
 | 
						|
	catch(mysql::ListenerException const& e)
 | 
						|
	{
 | 
						|
		errmsg = e.what();
 | 
						|
		goto error_handling;
 | 
						|
	}
 | 
						|
	catch(boost::system::error_code const& e)
 | 
						|
	{
 | 
						|
		errmsg = e.message();
 | 
						|
		goto error_handling;
 | 
						|
	}
 | 
						|
	// Try and catch all exceptions
 | 
						|
	catch(std::exception const& e)
 | 
						|
	{
 | 
						|
		errmsg = e.what();
 | 
						|
		goto error_handling;
 | 
						|
	}
 | 
						|
	// Rest of them
 | 
						|
	catch(...)
 | 
						|
	{
 | 
						|
		errmsg = std::string("Unknown exception: ");
 | 
						|
		goto error_handling;
 | 
						|
	}
 | 
						|
 | 
						|
	return (err);
 | 
						|
 | 
						|
error_handling:
 | 
						|
	*error_message = (char *)malloc(errmsg.size()+1);
 | 
						|
	strcpy(*error_message, errmsg.c_str());
 | 
						|
	// This will log error to log file
 | 
						|
	skygw_log_write_flush( LOGFILE_ERROR, (char *)errmsg.c_str());
 | 
						|
 | 
						|
err_exit:
 | 
						|
	return (1);
 | 
						|
 | 
						|
}
 |