This commit is contained in:
vraatikka
2013-07-23 08:36:26 +03:00
10 changed files with 748 additions and 63 deletions

View File

@ -183,10 +183,12 @@ class Gtid_event: public Binary_log_event
{
public:
Gtid_event(Log_event_header *header) : Binary_log_event(header) {}
size_t gtid_length() { return MYSQL_GTID_ENCODED_SIZE;}
boost::uint32_t domain_id;
boost::uint32_t server_id;
boost::uint64_t sequence_number;
char m_mysql_gtid[MYSQL_GTID_ENCODED_SIZE];
unsigned char m_mysql_gtid[MYSQL_GTID_ENCODED_SIZE];
Gtid m_gtid;
};

View File

@ -50,8 +50,9 @@ class Gtid
{
public:
Gtid()
: m_real_gtid(false), m_domain_id(0), m_server_id(0), m_sequence_number(0), m_server_type(MYSQL_SERVER_TYPE_NA)
Gtid()
: m_real_gtid(false), m_domain_id(0), m_server_id(0), m_sequence_number(0),
m_server_type(MYSQL_SERVER_TYPE_NA), m_gtid_length(0)
{
memset(m_mysql_gtid, 0, MYSQL_GTID_ENCODED_SIZE);
}
@ -60,16 +61,21 @@ class Gtid
const boost::uint32_t server_id,
const boost::uint64_t sequence_number);
Gtid(const char *mysql_gtid,
Gtid(const unsigned char *mysql_gtid,
const boost::uint64_t gno);
Gtid(const char *mysql_gtid);
Gtid(const unsigned char *mysql_gtid);
~Gtid() {}
bool is_real_gtid() const { return m_real_gtid;}
const char* get_mysql_gtid() const { return m_mysql_gtid; }
const unsigned char* get_mysql_gtid() const { return m_mysql_gtid; }
const unsigned char* get_gtid() const;
size_t get_gtid_length() const { return m_gtid_length; }
std::string get_string() const;
@ -85,8 +91,10 @@ class Gtid
boost::uint32_t m_domain_id;
boost::uint32_t m_server_id;
boost::uint64_t m_sequence_number;
boost::uint32_t m_gtid_length;
char m_mysql_gtid[MYSQL_GTID_ENCODED_SIZE];
unsigned char m_mysql_gtid[MYSQL_GTID_ENCODED_SIZE];
std::string m_mariadb_gtid;
};
}

View File

@ -42,25 +42,30 @@ namespace mysql
m_server_type(MYSQL_SERVER_TYPE_MARIADB)
{
memset(m_mysql_gtid, 0, MYSQL_GTID_ENCODED_SIZE);
m_mariadb_gtid = to_string(m_domain_id) + std::string("-") + to_string(m_server_id) + std::string("-") + to_string(m_sequence_number);
m_gtid_length = m_mariadb_gtid.length();
}
Gtid::Gtid(const char *mysql_gtid,
Gtid::Gtid(const unsigned char *mysql_gtid,
const boost::uint64_t gno)
:m_real_gtid(true),
m_domain_id(0),
m_server_id(0),
m_sequence_number(gno),
m_server_type(MYSQL_SERVER_TYPE_MYSQL)
m_server_type(MYSQL_SERVER_TYPE_MYSQL),
m_gtid_length(MYSQL_GTID_ENCODED_SIZE)
{
memcpy(m_mysql_gtid, mysql_gtid, MYSQL_GTID_ENCODED_SIZE);
}
Gtid::Gtid(const char* mysql_gtid)
Gtid::Gtid(const unsigned char* mysql_gtid)
:m_real_gtid(true),
m_domain_id(0),
m_server_id(0),
m_sequence_number(0),
m_server_type(MYSQL_SERVER_TYPE_MYSQL)
m_server_type(MYSQL_SERVER_TYPE_MYSQL),
m_gtid_length(MYSQL_GTID_ENCODED_SIZE)
{
int i,k;
char tmp[2];
@ -84,10 +89,10 @@ namespace mysql
std::string Gtid::get_string() const
{
if (m_server_type == MYSQL_SERVER_TYPE_MARIADB) {
return (to_string(m_domain_id) + std::string("-") + to_string(m_server_id) + std::string("-") + to_string(m_sequence_number));
return (m_mariadb_gtid);
} else {
std::string hexs;
char *sid = (char *)m_mysql_gtid;
unsigned char *sid = (unsigned char *)m_mysql_gtid;
char tmp[2];
// Dump the encoded SID using hexadesimal representation
@ -99,6 +104,15 @@ namespace mysql
return(hexs + std::string(":") + to_string(m_sequence_number));
}
}
const unsigned char* Gtid::get_gtid() const
{
if (m_server_type == MYSQL_SERVER_TYPE_MARIADB) {
return ((const unsigned char *)m_mariadb_gtid.c_str());
} else {
return (m_mysql_gtid);
}
}
}

View File

@ -5,7 +5,10 @@ cmake_minimum_required (VERSION 2.6)
# the library.
set(table_replication_consistency_sources
table_replication_consistency.cpp table_replication_listener.cpp table_replication_parser.cpp)
table_replication_consistency.cpp
table_replication_listener.cpp
table_replication_parser.cpp
table_replication_metadata.cpp)
# ---------- Find Boost Headers/Libraries -----------------------
SET(Boost_DEBUG FALSE)

View File

@ -52,12 +52,12 @@ This function will register replication listener for every server
provided and initialize all internal data structures and starts listening
the replication stream.
@return 0 on success, error code at failure. */
int
int
tb_replication_consistency_init(
/*============================*/
replication_listener_t *rpl, /*!< in: Server
definition. */
int n_servers, /*!< in: Number of servers */
size_t n_servers, /*!< in: Number of servers */
unsigned int gateway_server_id) /*!< in: Gateway slave
server id. */
{
@ -118,7 +118,7 @@ tb_replication_consistency_init(
error_handling:
n_replication_listeners = i;
rpl[i].error_message = (char *)malloc(errmsg.size()+1);
rpl[i].error_message = (char *)malloc(errmsg.size()+1);
strcpy(rpl[i].error_message, errmsg.c_str());
return (1);
}
@ -130,14 +130,14 @@ status structures. Client must allocate memory for consistency result
array and provide the maximum number of values returned. At return
there is information how many results where available.
@return 0 on success, error code at failure. */
int
int
tb_replication_consistency_query(
/*=============================*/
table_consistency_query_t *tb_query, /*!< in: Table consistency
query. */
query. */
table_consistency_t *tb_consistency, /*!< in: Table consistency
status structure.*/
int *n_servers) /*!< inout: Number of
size_t *n_servers) /*!< inout: Number of
servers where to get table
consistency status. Out: Number
of successfull consistency
@ -258,8 +258,8 @@ the current status on metadata to MySQL server.
@return 0 on success, error code at failure. */
int
tb_replication_consistency_shutdown(
char ** error_message) /*!< out: error message */
/*================================*/
char ** error_message) /*!< out: error message */
{
int err = 0;
boost::uint32_t i = 0;

View File

@ -44,11 +44,12 @@ typedef struct {
int use_binlog_pos; /*!< in: 1 if binlog position
should be used for binlog start
position. */
char *gtid; /*!< in: Global transaction identifier
unsigned char *gtid; /*!< in: Global transaction identifier
or NULL */
size_t gtid_length; /*!< in: Real size of GTID */
int is_master; /*!< in: Is this server a master 1 =
yes, 0 = no. */
int gateway_slave_server_id; /*!< in: replication listener slave
int gateway_slave_server_id; /*!< in: replication listener slave
server id. */
int listener_id; /*!< in: listener id */
int connection_suggesfull; /*!< out: 0 if connection successfull
@ -59,13 +60,13 @@ typedef struct {
/* Structure definition for table consistency query */
typedef struct table_consistency_query {
char *db_dot_table; /*!< in: Fully qualified database and
unsigned char *db_dot_table; /*!< in: Fully qualified database and
table, e.g. Production.Orders. */
} table_consistency_query_t;
/* Structure definition for table consistency result */
typedef struct table_consistency {
char *db_dot_table; /*!< out: Fully qualified database and
unsigned char *db_dot_table;/*!< out: Fully qualified database and
table, e.g. Production.Orders. */
unsigned int server_id; /*!< out: Server id where the consitency
information is from. */
@ -75,8 +76,9 @@ typedef struct table_consistency {
transaction id is known. */
unsigned long binlog_pos; /*!< out: Last seen binlog position
on this server. */
char *gtid; /*!< out: If global transacition id
unsigned char *gtid; /*!< out: If global transacition id
is known, will contain the id or NULL. */
size_t gtid_length; /*!< out: Real length of GTID */
int error_code; /*!< out: 0 if table consistency query
for this server succesfull or error
code. */
@ -85,6 +87,7 @@ typedef struct table_consistency {
server failed. */
} table_consistency_t;
EXTERN_C_BLOCK_BEGIN
/* Interface functions */
@ -94,12 +97,12 @@ This function will register replication listener for every server
provided and initialize all internal data structures and starts listening
the replication stream.
@return 0 on success, error code at failure. */
int
int
tb_replication_consistency_init(
/*============================*/
replication_listener_t *rpl, /*!< in: Server
definition. */
int n_servers, /*!< in: Number of servers */
size_t n_servers, /*!< in: Number of servers */
unsigned int gateway_server_id);/*!< in: Gateway slave
server id. */
@ -110,14 +113,14 @@ status structures. Client must allocate memory for consistency result
array and provide the maximum number of values returned. At return
there is information how many results where available.
@return 0 on success, error code at failure. */
int
int
tb_replication_consistency_query(
/*=============================*/
table_consistency_query_t *tb_query, /*!< in: Table consistency
query. */
query. */
table_consistency_t *tb_consistency, /*!< in: Table consistency
status structure.*/
int *n_servers); /*!< inout: Number of
size_t *n_servers); /*!< inout: Number of
servers where to get table
consistency status. Out: Number
of successfull consistency

View File

@ -37,6 +37,7 @@ Updated:
#include "table_replication_consistency.h"
#include "table_replication_listener.h"
#include "table_replication_parser.h"
#include "table_replication_metadata.h"
using mysql::Binary_log;
using mysql::system::create_transport;
@ -49,32 +50,27 @@ namespace mysql {
namespace table_replication_listener {
/* Table Consistency data structure */
typedef struct {
char* database_dot_table; /* Fully qualified db.table name,
primary key. */
boost::uint32_t server_id; /* Server id */
char* gtid; /* Global transaction id */
boost::uint64_t binlog_pos; /* Binlog position */
bool gtid_known; /* Is gtid known ? */
} table_listener_consistency_t;
/* STL multimap containing the consistency information. Multimap is used
because same table can be found from several servers. */
multimap<std::string, table_listener_consistency_t*> table_consistency_map;
multimap<std::string, tbr_metadata_t*> table_consistency_map;
boost::mutex table_consistency_mutex; /* This mutex is used protect
abve data structure from
above data structure from
multiple threads */
/* We use this map to store constructed binary log connections */
/* We use this map to store constructed binary log connections */
map<int, Binary_log*> table_replication_listeners;
boost::mutex table_replication_mutex; /* This mutex is used protect
abve data structure from
multiple threads */
bool listener_shutdown = false; /* This flag will be true
at shutdown */
pthread_t metadata_updater_pid; /* Thread id for metadata
updater*/
/***********************************************************************//**
Internal function to update table consistency information based
on log event header, table name and if GTID is known the gtid.*/
@ -87,7 +83,7 @@ tbrl_update_consistency(
Gtid& gtid) /*!< in: gtid */
{
bool not_found = true;
table_listener_consistency_t *tc=NULL;
tbr_metadata_t *tc=NULL;
// Need to be protected by mutex to avoid concurrency problems
boost::mutex::scoped_lock lock(table_consistency_mutex);
@ -96,7 +92,7 @@ tbrl_update_consistency(
not_found = true;
} else {
// Loop through the consistency values
for(multimap<std::string, table_listener_consistency_t*>::iterator i = table_consistency_map.find(database_dot_table);
for(multimap<std::string, tbr_metadata_t*>::iterator i = table_consistency_map.find(database_dot_table);
i != table_consistency_map.end(); ++i) {
tc = (*i).second;
if (tc->server_id == lheader->server_id) {
@ -108,23 +104,25 @@ tbrl_update_consistency(
if(not_found) {
// Consistency for this table and server not found, insert a record
table_listener_consistency_t* tb_c = (table_listener_consistency_t*) malloc(sizeof(table_listener_consistency_t));
tb_c->database_dot_table = (char *)malloc(database_dot_table.size()+1);
strcpy(tb_c->database_dot_table, database_dot_table.c_str());
tbr_metadata_t* tb_c = (tbr_metadata_t*) malloc(sizeof(tbr_metadata_t));
tb_c->db_table = (unsigned char *)malloc(database_dot_table.size()+1);
strcpy((char *)tb_c->db_table, (char *)database_dot_table.c_str());
tb_c->server_id = lheader->server_id;
tb_c->binlog_pos = lheader->next_position;
tb_c->gtid_known = gtid_known;
tb_c->gtid = (char *)malloc(gtid.get_string().size()+1);
strcpy(tb_c->gtid, gtid.get_string().c_str());
tb_c->gtid_len = gtid.get_gtid_length();
tb_c->gtid = (unsigned char *)malloc(tb_c->gtid_len);
memcpy(tb_c->gtid, gtid.get_gtid(), tb_c->gtid_len);
table_consistency_map.insert(pair<std::string, table_listener_consistency_t*>(database_dot_table,tb_c));
table_consistency_map.insert(pair<std::string, tbr_metadata_t*>(database_dot_table,tb_c));
} else {
// Consistency for this table and server found, update the
// consistency values
tc->binlog_pos = lheader->next_position;
free(tc->gtid);
tc->gtid = (char *)malloc(gtid.get_string().size()+1);
strcpy(tc->gtid, gtid.get_string().c_str());
tc->gtid_len = gtid.get_gtid_length();
tc->gtid = (unsigned char *)malloc(tc->gtid_len);
memcpy(tc->gtid, gtid.get_gtid(), tc->gtid_len);
tc->gtid_known = gtid_known;
}
}
@ -245,7 +243,8 @@ void* tb_replication_listener_reader(
gtid_known = true;
gtid = Gtid(gevent->domain_id, gevent->server_id, gevent->sequence_number);
} else {
// TODO MYSQL
gtid_known = true;
gtid = Gtid(gevent->m_mysql_gtid);
}
std::cout << "Thread: " << id << " server_id " << lheader->server_id
@ -356,6 +355,8 @@ tb_replication_listener_shutdown(
boost::mutex::scoped_lock lock(table_replication_mutex);
map<int, Binary_log*>::iterator b_it;
listener_shutdown = true;
b_it = table_replication_listeners.find(server_id);
if ( b_it != table_replication_listeners.end()) {
@ -408,7 +409,7 @@ status structures. Client must allocate memory for consistency result
array and provide the maximum number of values returned. At return
there is information how many results where available.
@return 0 on success, error code at failure. */
int
int
tb_replication_listener_consistency(
/*================================*/
const char *db_dot_table, /*!< in: Fully qualified table
@ -418,17 +419,17 @@ tb_replication_listener_consistency(
{
bool found = false;
boost::uint32_t cur_server = 0;
table_listener_consistency_t *tc=NULL;
tbr_metadata_t *tc=NULL;
// Need to be protected by mutex to avoid concurrency problems
boost::mutex::scoped_lock lock(table_consistency_mutex);
// Loop through the consistency values
for(multimap<std::string, table_listener_consistency_t*>::iterator i = table_consistency_map.find(db_dot_table);
for(multimap<std::string, tbr_metadata_t*>::iterator i = table_consistency_map.find(db_dot_table);
i != table_consistency_map.end(); ++i, ++cur_server) {
if (cur_server == server_no) {
tc = (*i).second;
memcpy(tb_consistency, tc, sizeof(table_listener_consistency_t));
memcpy(tb_consistency, tc, sizeof(tbr_metadata_t));
found = true;
break;
}
@ -441,7 +442,6 @@ tb_replication_listener_consistency(
}
}
/***********************************************************************//**
This function will reconnect replication listener to a server
provided.
@ -546,6 +546,82 @@ err_exit:
return (1);
}
/***********************************************************************//**
This internal function is executed on its own thread and it will write
table consistency information to the master database in every n seconds
based on configuration.
*/
void
*tb_replication_listener_metadata_updater(
/*======================================*/
void *arg) /*!< in: Master definition */
{
replication_listener_t *master = (replication_listener_t*)arg;
tbr_metadata_t **tm=NULL;
char *body = master->server_url;
size_t len = strlen(master->server_url);
/* Find the beginning of the user name */
strncmp(body, "//", 2);
/* Find the user name, which is mandatory */
const char *user = body + 2;
const char *user_end= strpbrk(user, ":@");
/* Find the password, which can be empty */
assert(*user_end == ':' || *user_end == '@');
const char *const pass = user_end + 1; // Skip the ':' (or '@')
const char *pass_end = pass;
if (*user_end == ':')
{
pass_end = strchr(pass, '@');
}
/* Find the host name, which is mandatory */
// Skip the '@', if there is one
const char *host = *pass_end == '@' ? pass_end + 1 : pass_end;
const char *host_end = strchr(host, ':');
/* If no ':' was found there is no port, so the host end at the end
* of the string */
if (host_end == 0)
host_end = body + len;
/* Find the port number */
unsigned long portno = 3306;
if (*host_end == ':')
portno = strtoul(host_end + 1, NULL, 10);
while(listener_shutdown == false) {
sleep(10000); // Sleep ~10 seconds
{
// Need to be protected by mutex to avoid concurrency problems
boost::mutex::scoped_lock lock(table_consistency_mutex);
size_t nelems = table_consistency_map.size();
size_t k =0;
tm = (tbr_metadata_t**)calloc(nelems, sizeof(tbr_metadata_t*));
for(multimap<std::string, tbr_metadata_t*>::iterator i = table_consistency_map.begin();
i != table_consistency_map.end(); ++i,++k) {
tm[k] = ((*i).second);
}
// Release mutex
lock.unlock();
// Insert or update metadata information
tbrm_write_metadata(host, user, pass, portno, tm, nelems);
}
}
pthread_exit(NULL);
return NULL;
}
} // namespace table_replication_listener
} // namespace mysql

View File

@ -55,7 +55,7 @@ there is information how many results where available.
int
tb_replication_listener_consistency(
/*================================*/
const char *db_dot_table, /*!< in: Fully qualified table
const unsigned char *db_dot_table, /*!< in: Fully qualified table
name. */
table_consistency_t *tb_consistency, /*!< out: Consistency values. */
boost::uint32_t server_no); /*!< in: Server */
@ -79,7 +79,7 @@ int
tb_replication_listener_shutdown(
/*=============================*/
boost::uint32_t server_id, /*!< in: server id */
char **error_message); /*!< out: error message */
char **error_message); /*!< out: error message */
} // table_replication_listener

View File

@ -0,0 +1,509 @@
/*
Copyright (C) 2013, SkySQL Ab
This file is distributed as part of the SkySQL Gateway. It is free
software: you can redistribute it and/or modify it under the terms of the
GNU General Public License as published by the Free Software Foundation,
version 2.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc., 51
Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
Author: Jan Lindström jan.lindstrom@skysql.com
Created: 15-07-2013
Updated:
*/
#include "binlog_api.h"
#include "my_pthread.h"
#include <getopt.h>
#include <iostream>
#include <iomanip>
#include <map>
#include <sstream>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <regex.h>
#include <algorithm>
#include "listener_exception.h"
#include <mysql.h>
#include <mysqld_error.h>
#include "table_replication_metadata.h"
namespace mysql {
namespace table_replication_metadata {
/***********************************************************************//**
*/
static void
tbrm_report_error(
/*==============*/
MYSQL *con,
const char *message,
const char *file,
int line)
{
fprintf(stderr, "%s at file %s line %d\n", message, file, line);
if (con != NULL) {
fprintf(stderr, "%s\n", mysql_error(con));
mysql_close(con);
}
}
/***********************************************************************//**
*/
static void
tbrm_stmt_error(
/*============*/
MYSQL_STMT *stmt,
const char *message,
const char *file,
int line)
{
fprintf (stderr, "%s at file %s line %d\n", message, file, line);
if (stmt != NULL)
{
fprintf (stderr, "Error %u (%s): %s\n",
mysql_stmt_errno (stmt),
mysql_stmt_sqlstate (stmt),
mysql_stmt_error (stmt));
}
}
/***********************************************************************//**
Inspect master data dictionary and if necessary table replication
consistency metadata is not created, create it.
@return false if create failed, true if metadata already created or
create succeeded */
static bool
tbrm_create_metadata(
/*=================*/
const char *master_host,
const char *user,
const char *passwd,
unsigned int master_port)
{
MYSQL *con = mysql_init(NULL);
unsigned int myerrno=0;
if (!con) {
// TODO: start to log error and other messages
return false;
}
if (mysql_real_connect(con, master_host, user, passwd, "mysql", mysql_port, NULL, 0) == NULL) {
tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__);
goto error_exit;
}
// Check is the database there
mysql_query(con, "USE SKYSQL_GATEWAY_METADATA");
myerrno = mysql_errno(con);
if (myerrno != 0 && myerrno != ER_NO_DB_ERROR) {
tbrm_report_error(con, "Error: mysql_query(USE_SKYSQL_GATEWAY_METADATA) failed", __FILE__, __LINE__);
goto error_exit;
} else if (myerrno == 0) {
// Database found, assuming everyting ok
return true;
}
// Create databse
mysql_query(con, "CREATE DATABASE SKYSQL_GATEWAY_METADATA");
if (mysql_errno(con) != 0) {
tbrm_report_error(con, "mysql_query(CREATE DATABASE SKYSQL_GATEWAY_METADATA) failed", __FILE__, __LINE__);
goto error_exit;
}
// Set correct database
mysql_query(con, "USE SKYSQL_GATEWAY_METADATA");
if (mysql_errno(con) != 0) {
tbrm_report_error(con, "Error: mysql_query(USE_SKYSQL_GATEWAY_METADATA) failed", __FILE__, __LINE__);
goto error_exit;
}
// Create table
mysql_query(con, "CREATE TABLE IF NOT EXISTS TABLE_REPLICATION_CONSISTENCY("
"DB_TABLE_NAME VARCHAR(255) NOT NULL,"
"SERVER_ID INT NOT NULL,"
"GTID VARBINARY(255),"
"BINLOG_POS BIGINT NOT NULL,"
"GTID_KNOWN INT,"
"PRIMARY KEY(DB_TABLE_NAME, SERVER_ID)) ENGINE=InnoDB");
if (mysql_errno(con) != 0) {
tbrm_report_error(con, "Error: Create table failed", __FILE__, __LINE__);
goto error_exit;
}
// Above clauses not really transactional, but lets play safe
mysql_query(con, "COMMIT");
if (mysql_errno(con) != 0) {
tbrm_report_error(con, "Error: Commit failed", __FILE__, __LINE__);
goto error_exit;
}
mysql_close(con);
// Done
return true;
error_exit:
if (con) {
mysql_close(con);
}
return false;
}
/***********************************************************************//**
Read table replication consistency metadata from the MySQL master server.
This function will create necessary database and table if they are not
yet created.
@return false if read failed, true if read succeeded */
bool
tbrm_read_metadata(
/*===============*/
const char *master_host, /*!< in: Master hostname */
const char *user, /*!< in: username */
const char *passwd, /*!< in: password */
unsigned int master_port, /*!< in: master port */
tbr_metadata_t **tbrm_meta, /*!< out: table replication consistency
metadata. */
unsigned int *tbrm_rows) /*!< out: number of rows read */
{
MYSQL *con = mysql_init(NULL);
unsigned int myerrno=0;
boost::uint64_t nrows=0;
boost::uint64_t i=0;
MYSQL_RES *result = NULL;
tbrm_create_metadata(master_host, user, passwd, master_port);
if (!con) {
// TODO: start to log error and other messages
return false;
}
if (mysql_real_connect(con, master_host, user, passwd, "mysql", mysql_port, NULL, 0) == NULL) {
tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__);
goto error_exit;
}
mysql_query(con, "USE SKYSQL_GATEWAY_METADATA");
myerrno = mysql_errno(con);
if (myerrno != 0) {
tbrm_report_error(con, "Error: Database set failed", __FILE__, __LINE__);
goto error_exit;
}
mysql_query(con, "SELECT * FROM TABLE_REPLICATION_CONSISTENCY");
myerrno = mysql_errno(con);
if (myerrno != 0) {
tbrm_report_error(con,"Error: Select from table_replication_consistency failed", __FILE__, __LINE__);
goto error_exit;
}
result = mysql_store_result(con);
if (!result) {
tbrm_report_error(con, "Error: mysql_store_result failed", __FILE__, __LINE__);
goto error_exit;
}
nrows = mysql_num_rows(result);
*tbrm_meta = (tbr_metadata_t*) calloc(nrows, sizeof(tbr_metadata_t));
*tbrm_rows = nrows;
for(i=0;i < nrows; i++) {
MYSQL_ROW row = mysql_fetch_row(result);
unsigned long *lengths = mysql_fetch_lengths(result);
// DB_TABLE_NAME
tbrm_meta[i]->db_table = (unsigned char *)malloc(lengths[0]);
strcpy((char *)tbrm_meta[i]->db_table, row[0]);
// SERVER_ID
tbrm_meta[i]->server_id = atol(row[1]);
// GTID
tbrm_meta[i]->gtid = (unsigned char *)malloc((lengths[2])*sizeof(unsigned char));
memcpy(tbrm_meta[i]->gtid, row[2], lengths[2]);
tbrm_meta[i]->gtid_len = lengths[2];
// BINLOG_POS
tbrm_meta[i]->binlog_pos = atoll(row[3]);
// GTID_KNOWN
tbrm_meta[i]->gtid_known = atol(row[4]);
}
mysql_free_result(result);
mysql_close(con);
return true;
error_exit:
if (result) {
mysql_free_result(result);
}
if (con) {
mysql_close(con);
}
return false;
}
/***********************************************************************//**
Write table replication consistency metadata from the MySQL master server.
This function assumes that necessary database and table are created.
@return false if read failed, true if read succeeded */
bool
tbrm_write_metadata(
/*================*/
const char *master_host, /*!< in: Master hostname */
const char *user, /*!< in: username */
const char *passwd, /*!< in: password */
unsigned int master_port, /*!< in: master port */
tbr_metadata_t **tbrm_meta, /*!< in: table replication consistency
metadata. */
boost::uint32_t tbrm_rows) /*!< in: number of rows read */
{
MYSQL *con = mysql_init(NULL);
int myerrno=0;
boost::uint32_t i;
MYSQL_STMT *sstmt=NULL;
MYSQL_STMT *istmt=NULL;
MYSQL_STMT *ustmt=NULL;
MYSQL_BIND sparam[2];
MYSQL_BIND iparam[5];
MYSQL_BIND uparam[5];
MYSQL_BIND result[1];
char *dbtable;
void *gtid;
int gtidknown;
int serverid;
boost::uint64_t binlogpos;
// Query to find out if the row already exists on table
const char *sst = "SELECT BINLOG_POS FROM TABLE_REPLICATION_CONSISTENCY WHERE"
" DB_TABLE_NAME='?' and SERVER_ID=?";
// Insert Query
const char *ist = "INSERT INTO TABLE_REPLICATION_CONSISTENCY(DB_TABLE_NAME,"
" SERVER_ID, GTID, BINLOG_POS, GTID_KNOWN) VALUES"
"(?, ?, ?, ?, ?)";
// Update Query
const char *ust = "UPDATE TABLE_REPLICATION_CONSISTENCY "
"SET GTID=?, BINLOG_POS=?, GTID_KNOWN=?"
" WHERE DB_TABLE_NAME=? AND SERVER_ID=?";
if (!con) {
// TODO: start to log error and other messages
return false;
}
if (mysql_real_connect(con, master_host, user, passwd, "mysql", mysql_port, NULL, 0) == NULL) {
tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__);
}
mysql_query(con, "USE SKYSQL_GATEWAY_METADATA");
myerrno = mysql_errno(con);
if (myerrno != 0) {
tbrm_report_error(con, "Error: Database set failed", __FILE__, __LINE__);
}
// Allocate statement handlers
sstmt = mysql_stmt_init(con);
istmt = mysql_stmt_init(con);
ustmt = mysql_stmt_init(con);
if (sstmt == NULL || istmt == NULL || ustmt == NULL) {
tbrm_report_error(con, "Could not initialize statement handler", __FILE__, __LINE__);
goto error_exit;
}
// Prepare the statements
if (mysql_stmt_prepare(sstmt, sst, strlen(sst)) != 0) {
tbrm_stmt_error(sstmt, "Error: Could not prepare select statement", __FILE__, __LINE__);
goto error_exit;
}
if (mysql_stmt_prepare(istmt, ist, strlen(ist)) != 0) {
tbrm_stmt_error(istmt, "Error: Could not prepare insert statement", __FILE__, __LINE__);
goto error_exit;
}
if (mysql_stmt_prepare(ustmt, ust, strlen(ust)) != 0) {
tbrm_stmt_error(ustmt, "Error: Could not prepare update statement", __FILE__, __LINE__);
goto error_exit;
}
// Initialize the parameters
memset (sparam, 0, sizeof (sparam));
memset (iparam, 0, sizeof (iparam));
memset (uparam, 0, sizeof (uparam));
memset (result, 0, sizeof (result));
// Init param structure
// Select
sparam[0].buffer_type = MYSQL_TYPE_VARCHAR;
sparam[0].buffer = (void *) dbtable;
sparam[1].buffer_type = MYSQL_TYPE_LONG;
sparam[1].buffer = (void *) &serverid;
// Insert
iparam[0].buffer_type = MYSQL_TYPE_VARCHAR;
iparam[0].buffer = (void *) dbtable;
iparam[1].buffer_type = MYSQL_TYPE_LONG;
iparam[1].buffer = (void *) &serverid;
iparam[2].buffer_type = MYSQL_TYPE_BLOB;
iparam[2].buffer = (void *) gtid;
iparam[3].buffer_type = MYSQL_TYPE_LONGLONG;
iparam[3].buffer = (void *) &binlogpos;
iparam[4].buffer_type = MYSQL_TYPE_SHORT;
iparam[4].buffer = (void *) &gtidknown;
// Update
uparam[0].buffer_type = MYSQL_TYPE_BLOB;
uparam[0].buffer = (void *) gtid;
uparam[1].buffer_type = MYSQL_TYPE_LONGLONG;
uparam[1].buffer = (void *) &binlogpos;
uparam[2].buffer_type = MYSQL_TYPE_SHORT;
uparam[2].buffer = (void *) &gtidknown;
uparam[3].buffer_type = MYSQL_TYPE_VARCHAR;
uparam[3].buffer = (void *) dbtable;
uparam[4].buffer_type = MYSQL_TYPE_LONG;
uparam[4].buffer = (void *) &serverid;
// Result set for select
result[0].buffer_type = MYSQL_TYPE_LONGLONG;
result[0].buffer = &binlogpos;
// Iterate through the data
for(i = 0; i < tbrm_rows; i++) {
// Start from Select, we need to know if the consistency
// information for this table, server pair is already
// in metadata or not.
dbtable = (char *)tbrm_meta[i]->db_table;
gtid = (char *)tbrm_meta[i]->gtid;
gtidknown = tbrm_meta[i]->gtid_known;
binlogpos = tbrm_meta[i]->binlog_pos;
serverid = tbrm_meta[i]->server_id;
sparam[0].buffer_length = strlen(dbtable);
uparam[3].buffer_length = sparam[0].buffer_length;
iparam[0].buffer_length = sparam[0].buffer_length;
uparam[0].buffer_length = tbrm_meta[i]->gtid_len;
iparam[2].buffer_length = tbrm_meta[i]->gtid_len;
// Bind param structure to statement
if (mysql_stmt_bind_param(sstmt, sparam) != 0) {
tbrm_stmt_error(sstmt, "Error: Could not bind select parameters", __FILE__, __LINE__);
goto error_exit;
}
// Bind result structure to statement
if (mysql_stmt_bind_result(sstmt, result) != 0) {
tbrm_stmt_error(sstmt, "Error: Could not bind select return parameters", __FILE__, __LINE__);
goto error_exit;
}
// Execute!!
if (mysql_stmt_execute(sstmt) != 0) {
tbrm_stmt_error(sstmt, "Error: Could not execute select statement", __FILE__, __LINE__);
goto error_exit;
}
// Store result
if (mysql_stmt_store_result(sstmt) != 0) {
tbrm_stmt_error(sstmt, "Error: Could not buffer result set", __FILE__, __LINE__);
goto error_exit;
}
// Fetch result
myerrno = mysql_stmt_fetch(sstmt);
if (myerrno == 1) {
tbrm_stmt_error(sstmt, "Error: Could not fetch result set", __FILE__, __LINE__);
goto error_exit;
}
// If fetch returned 0, it means that this table, serverid
// pair was found from metadata, we might need to update
// the consistency information.
if (myerrno == 0 && binlogpos != tbrm_meta[i]->binlog_pos) {
// Update the consistency information
binlogpos = tbrm_meta[i]->binlog_pos;
// Bind param structure to statement
if (mysql_stmt_bind_param(ustmt, uparam) != 0) {
tbrm_stmt_error(ustmt, "Error: Could not bind update parameters", __FILE__, __LINE__);
goto error_exit;
}
// Execute!!
if (mysql_stmt_execute(ustmt) != 0) {
tbrm_stmt_error(ustmt, "Error: Could not execute update statement", __FILE__, __LINE__);
goto error_exit;
}
} else {
// Insert the consistency information
binlogpos = tbrm_meta[i]->binlog_pos;
// Bind param structure to statement
if (mysql_stmt_bind_param(istmt, iparam) != 0) {
tbrm_stmt_error(istmt, "Error: Could not bind insert parameters", __FILE__, __LINE__);
goto error_exit;
}
// Execute!!
if (mysql_stmt_execute(istmt) != 0) {
tbrm_stmt_error(istmt, "Error: Could not execute insert statement", __FILE__, __LINE__);
goto error_exit;
}
}
}
return true;
error_exit:
// Cleanup
if (sstmt) {
if (mysql_stmt_close(sstmt)) {
tbrm_stmt_error(sstmt, "Error: Could not close select statement", __FILE__, __LINE__);
}
}
if (istmt) {
if (mysql_stmt_close(istmt)) {
tbrm_stmt_error(istmt, "Error: Could not close select statement", __FILE__, __LINE__);
}
}
if (ustmt) {
if (mysql_stmt_close(ustmt)) {
tbrm_stmt_error(ustmt, "Error: Could not close select statement", __FILE__, __LINE__);
}
}
if (con) {
mysql_close(con);
}
return false;
}
} // table_replication_metadata
} // mysql

View File

@ -0,0 +1,70 @@
/*
Copyright (C) 2013, SkySQL Ab
This file is distributed as part of the SkySQL Gateway. It is free
software: you can redistribute it and/or modify it under the terms of the
GNU General Public License as published by the Free Software Foundation,
version 2.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc., 51
Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
Author: Jan Lindström jan.lindstrom@skysql.com
Created: 15-07-2013
Updated:
*/
#ifndef TABLE_REPLICATION_METADATA_H
#define TABLE_REPLICATION_METADATA_H
/* Structure definition for table replication oconsistency metadata */
typedef struct {
unsigned char* db_table; /* Fully qualified db.table name,
primary key. */
boost::uint32_t server_id; /* Server id */
unsigned char* gtid; /* Global transaction id */
boost::uint32_t gtid_len; /* Length of gtid */
boost::uint64_t binlog_pos; /* Binlog position */
bool gtid_known; /* Is gtid known ? */
} tbr_metadata_t;
/***********************************************************************//**
Read table replication consistency metadata from the MySQL master server.
This function assumes that necessary database and table are created.
@return false if read failed, true if read succeeded */
bool
tbrm_read_metadata(
/*===============*/
const char *master_host, /*!< in: Master hostname */
const char *user, /*!< in: username */
const char *passwd, /*!< in: password */
unsigned int master_port, /*!< in: master port */
tbr_metadata_t **tbrm_meta, /*!< out: table replication consistency
metadata. */
size_t *tbrm_rows); /*!< out: number of rows read */
/***********************************************************************//**
Write table replication consistency metadata from the MySQL master server.
This function assumes that necessary database and table are created.
@return false if read failed, true if read succeeded */
bool
tbrm_write_metadata(
/*================*/
const char *master_host, /*!< in: Master hostname */
const char *user, /*!< in: username */
const char *passwd, /*!< in: password */
unsigned int master_port, /*!< in: master port */
tbr_metadata_t **tbrm_meta, /*!< in: table replication consistency
metadata. */
size_t tbrm_rows); /*!< in: number of rows read */
#endif