Added servers metadata and fixed issues on embedded server connection and test program compilation and linking.

This commit is contained in:
Jan Lindström 2013-07-25 17:59:55 +03:00
parent 1c3e44d446
commit b3b444880d
10 changed files with 701 additions and 82 deletions

View File

@ -5,18 +5,18 @@ link_directories(${PROJECT_BUILD_DIR}/lib)
include_directories(${PROJECT_BUILD_DIR}/include)
# Find MySQL client library and header files
find_library(MySQL_LIBRARY NAMES libmysqld.a PATHS
/usr/lib64/mysql /usr/lib/mysql /usr/local/mysql/lib)
find_path(MySQL_INCLUDE_DIR mysql.h
/usr/local/include/mysql /usr/include/mysql)
include_directories(${MySQL_INCLUDE_DIR})
include_directories(../../table_replication_consistency)
include_directories(../../utils)
# Create build rules for all the simple examples that only require a
# single file.
foreach(prog basic-1 basic-2 jan_test)
ADD_EXECUTABLE(${prog} ${prog}.cpp)
TARGET_LINK_LIBRARIES(${prog} replication boost_system pthread aio crypt ${MySQL_LIBRARY})
ADD_EXECUTABLE(${prog} ${prog}.cpp /usr/local/mysql/lib/libmysqld.a)
TARGET_LINK_LIBRARIES(${prog} replication boost_system boost_thread pthread aio crypt ${MySQL_LIBRARY})
endforeach()
add_subdirectory(mysql2lucene EXCLUDE_FROM_ALL)

View File

@ -20,7 +20,7 @@ using namespace std;
using namespace mysql::system;
static char* server_options[] = {
"jan test",
"jan_test",
"--datadir=/tmp/",
"--skip-innodb",
"--default-storage-engine=myisam",
@ -30,11 +30,10 @@ static char* server_options[] = {
const int num_elements = (sizeof(server_options) / sizeof(char *)) - 1;
static char* server_groups[] = {
"embedded",
"server",
"server",
"server",
NULL
"libmysqld_server",
"libmysqld_client",
"libmysqld_server",
"libmysqld_server", NULL
};
void* binlog_reader(void * arg)
@ -46,11 +45,11 @@ void* binlog_reader(void * arg)
pthread_t id = pthread_self();
string database_dot_table;
const char* server_type;
Gtid gtid("62cda1d0e3a011e289d76ac0855a31e8:10");
Gtid gtid();
try {
Binary_log binlog(create_transport(uri));
binlog.connect(gtid);
binlog.connect();
server_type = binlog.get_mysql_server_type_str();
@ -90,7 +89,7 @@ void* binlog_reader(void * arg)
<< " position " << lheader->next_position << " : Found event of type "
<< event->get_event_type()
<< " txt " << get_event_type_str(event->get_event_type())
<< " GTID " << std::string(gevent->m_gtid.get_mysql_gtid())
<< " GTID " << std::string((char *)gevent->m_gtid.get_gtid())
<< " GTID " << gevent->m_gtid.get_string()
<< std::endl;
@ -177,7 +176,7 @@ int main(int argc, char** argv) {
exit(2);
}
if (mysql_server_init(num_elements, server_options, server_groups)) {
if (mysql_library_init(num_elements, server_options, server_groups)) {
std::cerr << "Failed to init MySQL server" << std::endl;
exit(1);
}

View File

@ -239,7 +239,8 @@ int Binlog_tcp_driver::fetch_server_version(const std::string& user,
throw(ListenerException(std::string("mysql_init() failed"), __FILE__, __LINE__));
}
mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "client");
mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "libmysqld_client");
mysql_options(mysql, MYSQL_OPT_RECONNECT, &reconnect);
mysql_options(mysql, MYSQL_OPT_USE_REMOTE_CONNECTION, NULL);

View File

@ -111,6 +111,25 @@ tb_replication_consistency_init(
tbr_trace = true;
}
// Find out the master server
for(i=0;i < n_servers; i++) {
if (rpl[i].is_master) {
break;
}
}
// If master is found read metadata from MySQL server, if not report error
if (i < n_servers) {
char *errm = NULL;
if(!tb_replication_listener_init(&(rpl[i]), &errm)) {
errmsg = std::string(errm);
free(errm);
}
} else {
errmsg = string("Master server is missing from configuration");
goto error_handling;
}
// Start replication stream reader thread for every server in the configuration
for(i=0;i < n_servers; i++) {
// We need to try catch all exceptions here because function
@ -135,6 +154,7 @@ tb_replication_consistency_init(
}
// Start actual replication listener
/*
err = pthread_create(
&replication_listener_tid[i],
NULL,
@ -145,6 +165,7 @@ tb_replication_consistency_init(
errmsg = string(strerror(err));
goto error_handling;
}
*/
}
// Replication listener will use this exception for

View File

@ -57,25 +57,31 @@ namespace table_replication_listener {
because same table can be found from several servers. */
multimap<std::string, tbr_metadata_t*> table_consistency_map;
boost::mutex table_consistency_mutex; /* This mutex is used protect
boost::mutex table_consistency_mutex; /* This mutex is used to protect
above data structure from
multiple threads */
/* We use this map to store constructed binary log connections */
map<int, Binary_log*> table_replication_listeners;
boost::mutex table_replication_mutex; /* This mutex is used protect
abve data structure from
boost::mutex table_replication_mutex; /* This mutex is used to protect
above data structure from
multiple threads */
/* We use this map to store table consistency server metadata */
map<boost::uint32_t, tbr_server_t*> table_replication_servers;
boost::mutex table_servers_mutex; /* This mutex is used to proted
above data structure from multiple
threads */
replication_listener_t *master; /* Master server definition */
/* Master connect info */
const char *master_user=NULL;
const char *master_passwd=NULL;
const char *master_host=NULL;
unsigned long master_port=3306;
char *master_user=NULL;
char *master_passwd=NULL;
char *master_host=NULL;
unsigned long master_port=3307;
/***********************************************************************//**
Internal function to extract user, passwd, hostname and port from
@ -87,33 +93,50 @@ tbrl_extract_master_connect_info()
char *body = master->server_url;
size_t len = strlen(master->server_url);
/* Find the beginning of the user name */
strncmp(body, "//", 2);
/* Find the user name, which is mandatory */
const char *user = body + 8;
/* Find the user name, which is mandatory */
master_user = body + 2;
const char *user_end= strpbrk(master_user, ":@");
const char *user_end= strpbrk(user, ":@");
/* Find the password, which can be empty */
assert(*user_end == ':' || *user_end == '@');
master_passwd = user_end + 1; // Skip the ':' (or '@')
const char *pass_end = master_passwd;
if (*user_end == ':')
{
pass_end = strchr(master_passwd, '@');
}
assert(user_end - user >= 1); // There has to be a username
/* Find the host name, which is mandatory */
// Skip the '@', if there is one
master_host = *pass_end == '@' ? pass_end + 1 : pass_end;
const char *host_end = strchr(master_host, ':');
/* If no ':' was found there is no port, so the host end at the end
* of the string */
if (host_end == 0)
host_end = body + len;
/* Find the port number */
if (*host_end == ':')
master_port = strtoul(host_end + 1, NULL, 10);
/* Find the password, which can be empty */
assert(*user_end == ':' || *user_end == '@');
const char *const pass = user_end + 1; // Skip the ':' (or '@')
const char *pass_end = pass;
if (*user_end == ':')
{
pass_end = strchr(pass, '@');
}
assert(pass_end - pass >= 0); // Password can be empty
/* Find the host name, which is mandatory */
// Skip the '@', if there is one
const char *host = *pass_end == '@' ? pass_end + 1 : pass_end;
const char *host_end = strchr(host, ':');
if (host == host_end)
/* If no ':' was found there is no port, so the host end at the end
* of the string */
if (host_end == 0)
host_end = body + len;
assert(host_end - host >= 1); // There has to be a host
/* Find the port number */
unsigned long portno = 3307;
if (*host_end == ':')
portno = strtoul(host_end + 1, NULL, 10);
std::string u(user, user_end - user);
std::string p(pass, pass_end - pass);
std::string h(host, host_end - host);
master_user = (char *)malloc(u.length()+1);
master_passwd = (char *)malloc(p.length()+1);
master_host = (char *)malloc(h.length()+1);
strcpy(master_user, u.c_str());
strcpy(master_passwd, p.c_str());
strcpy(master_host, h.c_str());
master_port = portno;
}
/***********************************************************************//**
@ -180,6 +203,58 @@ tbrl_update_consistency(
}
/***********************************************************************//**
Internal function to update table replication consistency server status
based on log event header and gtid if known*/
static void
tbrl_update_server_status(
/*======================*/
Log_event_header *lheader, /*!< in: Log event header */
bool gtid_known, /*!< in: is GTID known */
Gtid& gtid) /*!< in: gtid */
{
bool not_found = true;
tbr_server_t *ts=NULL;
// Need to be protected by mutex to avoid concurrency problems
boost::mutex::scoped_lock lock(table_servers_mutex);
if(table_replication_servers.find(lheader->server_id) == table_replication_servers.end()) {
not_found = true;
} else {
not_found = false;
}
if(not_found) {
// Consistency for this server not found, insert a record
ts = (tbr_server_t*) malloc(sizeof(tbr_server_t));
ts->server_id = lheader->server_id;
ts->binlog_pos = lheader->next_position;
ts->gtid_known = gtid_known;
ts->gtid_len = gtid.get_gtid_length();
ts->gtid = (unsigned char *)malloc(ts->gtid_len);
memcpy(ts->gtid, gtid.get_gtid(), ts->gtid_len);
table_replication_servers.insert(pair<boost::uint32_t, tbr_server_t*>(lheader->server_id, ts));
} else {
// Consistency for this server found, update the consistency values
ts->binlog_pos = lheader->next_position;
free(ts->gtid);
ts->gtid_len = gtid.get_gtid_length();
ts->gtid = (unsigned char *)malloc(ts->gtid_len);
memcpy(ts->gtid, gtid.get_gtid(), ts->gtid_len);
ts->gtid_known = gtid_known;
}
if (tbr_trace) {
// This will log error to log file
skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Trace: Current state for server %d binlog_pos %lu GTID '%s'",
ts->server_id, ts->binlog_pos, gtid.get_string().c_str());
}
}
/***********************************************************************//**
This is the function that is executed by replication listeners.
At startup it will try to connect the server and start listening
@ -204,7 +279,7 @@ void* tb_replication_listener_reader(
try {
Binary_log binlog(create_transport(uri), uri);
binlog.connect(gtid);
binlog.connect();
{
// Need to be protected by mutex to avoid concurrency problems
@ -238,6 +313,9 @@ void* tb_replication_listener_reader(
lheader = event->header();
// Insert or update current server status
tbrl_update_server_status(lheader, gtid_known, gtid);
switch(event->get_event_type()) {
case QUERY_EVENT: {
@ -680,7 +758,7 @@ void
tbrl_extract_master_connect_info();
while(listener_shutdown == false) {
sleep(10000); // Sleep ~10 seconds
sleep(10); // Sleep ~10 seconds
try {
// Need to be protected by mutex to avoid concurrency problems
@ -702,11 +780,11 @@ void
lock.unlock();
// Insert or update metadata information
err = tbrm_write_metadata(
err = tbrm_write_consistency_metadata(
(const char *)master_host,
(const char *)master_user,
(const char *)master_passwd,
(unsigned int)master_port,
master_port,
tm,
nelems);
@ -750,6 +828,80 @@ my_exit:
return NULL;
}
/***********************************************************************//**
Read current state of the metadata from the MySQL server or create
necessary metadata and initialize listener metadata.
@return true on success, false on failure
*/
bool
tb_replication_listener_init(
/*=========================*/
replication_listener_t* rpl, /*! in: Master server definition */
char **error_message) /*!< out: error message */
{
tbr_metadata_t *tm = NULL;
size_t tm_rows = 0;
std::string dbtable;
std::string err;
master = rpl;
// Set up master connect info
tbrl_extract_master_connect_info();
try {
if (!tbrm_read_consistency_metadata((const char *)master_host,
(const char *)master_user,
(const char *)master_passwd,
(unsigned int)master_port,
&tm,
&tm_rows)) {
err = std::string("Error: reading metadata failed");
goto error_exit;
}
for(size_t i=0;i < tm_rows; i++) {
tbr_metadata_t *t = &(tm[i]);
dbtable = std::string((char *)t->db_table);
table_consistency_map.insert(pair<std::string, tbr_metadata_t*>(dbtable, t));
}
}
catch(ListenerException e)
{
err = std::string("Listener exception: ")+ e.what();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
goto error_exit;
}
catch(boost::system::error_code e)
{
err = std::string("Listener system exception: ")+ e.message();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
goto error_exit;
}
// Try and catch all exceptions
catch(std::exception const& e)
{
err = std::string("Listener other exception: ")+ e.what();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
goto error_exit;
}
// Rest of them
catch(...)
{
err = std::string("Unknown exception: ");
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
goto error_exit;
}
return true;
error_exit:
*error_message = (char *)malloc(err.length()+1);
strcpy(*error_message, err.c_str());
return false;
}
/***********************************************************************//**
Write current state of the metadata to the MySQL server and
clean up the data structures.
@ -773,7 +925,13 @@ tb_replication_listener_done(
}
// Insert or update metadata information
err = tbrm_write_metadata(master_host, master_user, master_passwd, master_port, tm, (size_t)nelems);
err = tbrm_write_consistency_metadata(
(const char *)master_host,
(const char *)master_user,
(const char *)master_passwd,
(unsigned int)master_port,
tm,
(size_t)nelems);
free(tm);

View File

@ -91,6 +91,17 @@ void
/*======================================*/
void *arg); /*!< in: Master definition */
/***********************************************************************//**
Read current state of the metadata from the MySQL server or create
necessary metadata and initialize listener metadata.
@return true on success, false on failure
*/
bool
tb_replication_listener_init(
/*=========================*/
replication_listener_t* rpl, /*! in: Master server definition */
char **error_message); /*!< out: error message */
/***********************************************************************//**
Write current state of the metadata to the MySQL server and
clean up the data structures.

View File

@ -102,11 +102,15 @@ tbrm_create_metadata(
unsigned int myerrno=0;
if (!con) {
// TODO: start to log error and other messages
skygw_log_write_flush(NULL, LOGFILE_ERROR,
(char *)"Mysql init failed", mysql_error(con));
return false;
}
if (mysql_real_connect(con, master_host, user, passwd, "mysql", mysql_port, NULL, 0) == NULL) {
mysql_options(con, MYSQL_READ_DEFAULT_GROUP, "libmysqld_client");
mysql_options(con, MYSQL_OPT_USE_REMOTE_CONNECTION, NULL);
if (!mysql_real_connect(con, master_host, user, passwd, NULL, master_port, NULL, 0)) {
tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__);
goto error_exit;
}
@ -115,12 +119,12 @@ tbrm_create_metadata(
mysql_query(con, "USE SKYSQL_GATEWAY_METADATA");
myerrno = mysql_errno(con);
if (myerrno != 0 && myerrno != ER_NO_DB_ERROR) {
tbrm_report_error(con, "Error: mysql_query(USE_SKYSQL_GATEWAY_METADATA) failed", __FILE__, __LINE__);
goto error_exit;
} else if (myerrno == 0) {
if (myerrno == 0) {
// Database found, assuming everyting ok
return true;
} else if (myerrno != ER_BAD_DB_ERROR) {
tbrm_report_error(con, "Error: mysql_query(USE_SKYSQL_GATEWAY_METADATA) failed", __FILE__, __LINE__);
goto error_exit;
}
// Create databse
@ -139,8 +143,8 @@ tbrm_create_metadata(
goto error_exit;
}
// Create table
mysql_query(con, "CREATE TABLE IF NOT EXISTS TABLE_REPLICATION_CONSISTENCY("
// Create consistency table
mysql_query(con, "CREATE TABLE TABLE_REPLICATION_CONSISTENCY("
"DB_TABLE_NAME VARCHAR(255) NOT NULL,"
"SERVER_ID INT NOT NULL,"
"GTID VARBINARY(255),"
@ -161,6 +165,28 @@ tbrm_create_metadata(
goto error_exit;
}
// Create servers table
mysql_query(con, "CREATE TABLE TABLE_REPLICATION_SERVERS("
"SERVER_ID INT NOT NULL,"
"BINLOG_POS BIGINT NOT NULL,"
"GTID VARBINARY(255),"
"GTID_KNOWN INT,"
"SERVER_TYPE INT,"
"PRIMARY KEY(SERVER_ID)) ENGINE=InnoDB");
if (mysql_errno(con) != 0) {
tbrm_report_error(con, "Error: Create table failed", __FILE__, __LINE__);
goto error_exit;
}
// Above clauses not really transactional, but lets play safe
mysql_query(con, "COMMIT");
if (mysql_errno(con) != 0) {
tbrm_report_error(con, "Error: Commit failed", __FILE__, __LINE__);
goto error_exit;
}
mysql_close(con);
// Done
@ -181,17 +207,16 @@ This function will create necessary database and table if they are not
yet created.
@return false if read failed, true if read succeeded */
bool
tbrm_read_metadata(
/*===============*/
tbrm_read_consistency_metadata(
/*===========================*/
const char *master_host, /*!< in: Master hostname */
const char *user, /*!< in: username */
const char *passwd, /*!< in: password */
unsigned int master_port, /*!< in: master port */
tbr_metadata_t **tbrm_meta, /*!< out: table replication consistency
metadata. */
unsigned int *tbrm_rows) /*!< out: number of rows read */
size_t *tbrm_rows) /*!< out: number of rows read */
{
MYSQL *con = mysql_init(NULL);
unsigned int myerrno=0;
boost::uint64_t nrows=0;
boost::uint64_t i=0;
@ -199,12 +224,17 @@ tbrm_read_metadata(
tbrm_create_metadata(master_host, user, passwd, master_port);
MYSQL *con = mysql_init(NULL);
if (!con) {
// TODO: start to log error and other messages
return false;
}
if (mysql_real_connect(con, master_host, user, passwd, "mysql", mysql_port, NULL, 0) == NULL) {
mysql_options(con, MYSQL_READ_DEFAULT_GROUP, "libmysqld_client");
mysql_options(con, MYSQL_OPT_USE_REMOTE_CONNECTION, NULL);
if (!mysql_real_connect(con, master_host, user, passwd, NULL, master_port, NULL, 0)) {
tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__);
goto error_exit;
}
@ -273,14 +303,13 @@ tbrm_read_metadata(
return false;
}
/***********************************************************************//**
Write table replication consistency metadata from the MySQL master server.
This function assumes that necessary database and table are created.
@return false if read failed, true if read succeeded */
bool
tbrm_write_metadata(
/*================*/
tbrm_write_consistency_metadata(
/*============================*/
const char *master_host, /*!< in: Master hostname */
const char *user, /*!< in: username */
const char *passwd, /*!< in: password */
@ -289,7 +318,6 @@ tbrm_write_metadata(
metadata. */
size_t tbrm_rows) /*!< in: number of rows read */
{
MYSQL *con = mysql_init(NULL);
int myerrno=0;
boost::uint32_t i;
MYSQL_STMT *sstmt=NULL;
@ -319,13 +347,19 @@ tbrm_write_metadata(
"SET GTID=?, BINLOG_POS=?, GTID_KNOWN=?"
" WHERE DB_TABLE_NAME=? AND SERVER_ID=?";
MYSQL *con = mysql_init(NULL);
if (!con) {
// TODO: start to log error and other messages
return false;
}
if (mysql_real_connect(con, master_host, user, passwd, "mysql", mysql_port, NULL, 0) == NULL) {
mysql_options(con, MYSQL_READ_DEFAULT_GROUP, "libmysqld_client");
mysql_options(con, MYSQL_OPT_USE_REMOTE_CONNECTION, NULL);
if (!mysql_real_connect(con, master_host, user, passwd, NULL, master_port, NULL, 0)) {
tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__);
goto error_exit;
}
mysql_query(con, "USE SKYSQL_GATEWAY_METADATA");
@ -407,7 +441,6 @@ tbrm_write_metadata(
dbtable = (char *)tbrm_meta[i]->db_table;
gtid = (char *)tbrm_meta[i]->gtid;
gtidknown = tbrm_meta[i]->gtid_known;
binlogpos = tbrm_meta[i]->binlog_pos;
serverid = tbrm_meta[i]->server_id;
sparam[0].buffer_length = strlen(dbtable);
@ -523,6 +556,356 @@ tbrm_write_metadata(
}
/***********************************************************************//**
Read table replication server metadata from the MySQL master server.
This function will create necessary database and table if they are not
yet created.
@return false if read failed, true if read succeeded */
bool
tbrm_read_server_metadata(
/*======================*/
const char *master_host, /*!< in: Master hostname */
const char *user, /*!< in: username */
const char *passwd, /*!< in: password */
unsigned int master_port, /*!< in: master port */
tbr_server_t **tbrm_servers,/*!< out: table replication server
metadata. */
size_t *tbrm_rows) /*!< out: number of rows read */
{
unsigned int myerrno=0;
boost::uint64_t nrows=0;
boost::uint64_t i=0;
MYSQL_RES *result = NULL;
tbrm_create_metadata(master_host, user, passwd, master_port);
MYSQL *con = mysql_init(NULL);
if (!con) {
// TODO: start to log error and other messages
return false;
}
mysql_options(con, MYSQL_READ_DEFAULT_GROUP, "libmysqld_client");
mysql_options(con, MYSQL_OPT_USE_REMOTE_CONNECTION, NULL);
if (!mysql_real_connect(con, master_host, user, passwd, NULL, master_port, NULL, 0)) {
tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__);
goto error_exit;
}
mysql_query(con, "USE SKYSQL_GATEWAY_METADATA");
myerrno = mysql_errno(con);
if (myerrno != 0) {
tbrm_report_error(con, "Error: Database set failed", __FILE__, __LINE__);
goto error_exit;
}
mysql_query(con, "SELECT * FROM TABLE_REPLICATION_SERVERS");
myerrno = mysql_errno(con);
if (myerrno != 0) {
tbrm_report_error(con,"Error: Select from table_replication_consistency failed", __FILE__, __LINE__);
goto error_exit;
}
result = mysql_store_result(con);
if (!result) {
tbrm_report_error(con, "Error: mysql_store_result failed", __FILE__, __LINE__);
goto error_exit;
}
nrows = mysql_num_rows(result);
*tbrm_servers = (tbr_server_t*) calloc(nrows, sizeof(tbr_server_t));
*tbrm_rows = nrows;
for(i=0;i < nrows; i++) {
MYSQL_ROW row = mysql_fetch_row(result);
unsigned long *lengths = mysql_fetch_lengths(result);
// SERVER_ID
tbrm_servers[i]->server_id = atol(row[0]);
// BINLOG_POS
tbrm_servers[i]->binlog_pos = atoll(row[1]);
// GTID
tbrm_servers[i]->gtid = (unsigned char *)malloc((lengths[2])*sizeof(unsigned char));
memcpy(tbrm_servers[i]->gtid, row[2], lengths[2]);
tbrm_servers[i]->gtid_len = lengths[2];
// GTID_KNOWN
tbrm_servers[i]->gtid_known = atol(row[3]);
// SERVER_TYPE
tbrm_servers[i]->server_type = atol(row[4]);
}
mysql_free_result(result);
mysql_close(con);
return true;
error_exit:
if (result) {
mysql_free_result(result);
}
if (con) {
mysql_close(con);
}
return false;
}
/***********************************************************************//**
Write table replication server metadata from the MySQL master server.
This function assumes that necessary database and table are created.
@return false if read failed, true if read succeeded */
bool
tbrm_write_server_metadata(
/*=======================*/
const char *master_host, /*!< in: Master hostname */
const char *user, /*!< in: username */
const char *passwd, /*!< in: password */
unsigned int master_port, /*!< in: master port */
tbr_server_t **tbrm_servers,/*!< in: table replication server
metadata. */
size_t tbrm_rows) /*!< in: number of rows read */
{
int myerrno=0;
boost::uint32_t i;
MYSQL_STMT *sstmt=NULL;
MYSQL_STMT *istmt=NULL;
MYSQL_STMT *ustmt=NULL;
MYSQL_BIND sparam[1];
MYSQL_BIND iparam[5];
MYSQL_BIND uparam[4];
MYSQL_BIND result[1];
char *dbtable;
void *gtid;
int gtidknown;
unsigned int serverid;
int servertype;
boost::uint64_t binlogpos;
// Query to find out if the row already exists on table
const char *sst = "SELECT BINLOG_POS FROM TABLE_REPLICATION_CONSISTENCY WHERE"
" SERVER_ID=?";
// Insert Query
const char *ist = "INSERT INTO TABLE_REPLICATION_SERVERS("
" SERVER_ID, GTID, BINLOG_POS, GTID_KNOWN, SERVER_TYPE) VALUES"
"(?, ?, ?, ?, ?)";
// Update Query
const char *ust = "UPDATE TABLE_REPLICATION_SERVERS "
"SET GTID=?, BINLOG_POS=?, GTID_KNOWN=?"
" WHERE SERVER_ID=?";
MYSQL *con = mysql_init(NULL);
if (!con) {
// TODO: start to log error and other messages
return false;
}
mysql_options(con, MYSQL_READ_DEFAULT_GROUP, "libmysqld_client");
mysql_options(con, MYSQL_OPT_USE_REMOTE_CONNECTION, NULL);
if (!mysql_real_connect(con, master_host, user, passwd, NULL, master_port, NULL, 0)) {
tbrm_report_error(con, "Error: mysql_real_connect failed", __FILE__, __LINE__);
goto error_exit;
}
mysql_query(con, "USE SKYSQL_GATEWAY_METADATA");
myerrno = mysql_errno(con);
if (myerrno != 0) {
tbrm_report_error(con, "Error: Database set failed", __FILE__, __LINE__);
}
// Allocate statement handlers
sstmt = mysql_stmt_init(con);
istmt = mysql_stmt_init(con);
ustmt = mysql_stmt_init(con);
if (sstmt == NULL || istmt == NULL || ustmt == NULL) {
tbrm_report_error(con, "Could not initialize statement handler", __FILE__, __LINE__);
goto error_exit;
}
// Prepare the statements
if (mysql_stmt_prepare(sstmt, sst, strlen(sst)) != 0) {
tbrm_stmt_error(sstmt, "Error: Could not prepare select statement", __FILE__, __LINE__);
goto error_exit;
}
if (mysql_stmt_prepare(istmt, ist, strlen(ist)) != 0) {
tbrm_stmt_error(istmt, "Error: Could not prepare insert statement", __FILE__, __LINE__);
goto error_exit;
}
if (mysql_stmt_prepare(ustmt, ust, strlen(ust)) != 0) {
tbrm_stmt_error(ustmt, "Error: Could not prepare update statement", __FILE__, __LINE__);
goto error_exit;
}
// Initialize the parameters
memset (sparam, 0, sizeof (sparam));
memset (iparam, 0, sizeof (iparam));
memset (uparam, 0, sizeof (uparam));
memset (result, 0, sizeof (result));
// Init param structure
// Select
sparam[0].buffer_type = MYSQL_TYPE_LONG;
sparam[0].buffer = (void *) &serverid;
// Insert
iparam[0].buffer_type = MYSQL_TYPE_LONG;
iparam[0].buffer = (void *) &serverid;
iparam[1].buffer_type = MYSQL_TYPE_BLOB;
iparam[1].buffer = (void *) gtid;
iparam[2].buffer_type = MYSQL_TYPE_LONGLONG;
iparam[2].buffer = (void *) &binlogpos;
iparam[3].buffer_type = MYSQL_TYPE_SHORT;
iparam[3].buffer = (void *) &gtidknown;
iparam[4].buffer_type = MYSQL_TYPE_LONG;
iparam[4].buffer = (void *) &servertype;
// Update
uparam[0].buffer_type = MYSQL_TYPE_BLOB;
uparam[0].buffer = (void *) gtid;
uparam[1].buffer_type = MYSQL_TYPE_LONGLONG;
uparam[1].buffer = (void *) &binlogpos;
uparam[2].buffer_type = MYSQL_TYPE_SHORT;
uparam[2].buffer = (void *) &gtidknown;
uparam[3].buffer_type = MYSQL_TYPE_LONG;
uparam[3].buffer = (void *) &serverid;
// Result set for select
result[0].buffer_type = MYSQL_TYPE_LONGLONG;
result[0].buffer = &binlogpos;
// Iterate through the data
for(i = 0; i < tbrm_rows; i++) {
// Start from Select, we need to know if the consistency
// information for this table, server pair is already
// in metadata or not.
gtid = (char *)tbrm_servers[i]->gtid;
gtidknown = tbrm_servers[i]->gtid_known;
serverid = tbrm_servers[i]->server_id;
servertype = tbrm_servers[i]->server_type;
uparam[0].buffer_length = tbrm_servers[i]->gtid_len;
iparam[1].buffer_length = tbrm_servers[i]->gtid_len;
// Bind param structure to statement
if (mysql_stmt_bind_param(sstmt, sparam) != 0) {
tbrm_stmt_error(sstmt, "Error: Could not bind select parameters", __FILE__, __LINE__);
goto error_exit;
}
// Bind result structure to statement
if (mysql_stmt_bind_result(sstmt, result) != 0) {
tbrm_stmt_error(sstmt, "Error: Could not bind select return parameters", __FILE__, __LINE__);
goto error_exit;
}
// Execute!!
if (mysql_stmt_execute(sstmt) != 0) {
tbrm_stmt_error(sstmt, "Error: Could not execute select statement", __FILE__, __LINE__);
goto error_exit;
}
// Store result
if (mysql_stmt_store_result(sstmt) != 0) {
tbrm_stmt_error(sstmt, "Error: Could not buffer result set", __FILE__, __LINE__);
goto error_exit;
}
// Fetch result
myerrno = mysql_stmt_fetch(sstmt);
if (myerrno == 1) {
tbrm_stmt_error(sstmt, "Error: Could not fetch result set", __FILE__, __LINE__);
goto error_exit;
}
// If fetch returned 0, it means that this table, serverid
// pair was found from metadata, we might need to update
// the consistency information.
if (myerrno == 0 && binlogpos != tbrm_servers[i]->binlog_pos) {
// Update the consistency information
binlogpos = tbrm_servers[i]->binlog_pos;
// Bind param structure to statement
if (mysql_stmt_bind_param(ustmt, uparam) != 0) {
tbrm_stmt_error(ustmt, "Error: Could not bind update parameters", __FILE__, __LINE__);
goto error_exit;
}
// Execute!!
if (mysql_stmt_execute(ustmt) != 0) {
tbrm_stmt_error(ustmt, "Error: Could not execute update statement", __FILE__, __LINE__);
goto error_exit;
}
if (tbr_debug) {
skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Debug: Metadata state updated for %s in server %d is binlog_pos %lu gtid '%s'",
dbtable, serverid, binlogpos, gtid);
}
} else {
// Insert the consistency information
binlogpos = tbrm_servers[i]->binlog_pos;
// Bind param structure to statement
if (mysql_stmt_bind_param(istmt, iparam) != 0) {
tbrm_stmt_error(istmt, "Error: Could not bind insert parameters", __FILE__, __LINE__);
goto error_exit;
}
// Execute!!
if (mysql_stmt_execute(istmt) != 0) {
tbrm_stmt_error(istmt, "Error: Could not execute insert statement", __FILE__, __LINE__);
goto error_exit;
}
if (tbr_debug) {
skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Debug: Metadata state inserted for %s in server %d is binlog_pos %lu gtid '%s'",
dbtable, serverid, binlogpos, gtid);
}
}
}
return true;
error_exit:
// Cleanup
if (sstmt) {
if (mysql_stmt_close(sstmt)) {
tbrm_stmt_error(sstmt, "Error: Could not close select statement", __FILE__, __LINE__);
}
}
if (istmt) {
if (mysql_stmt_close(istmt)) {
tbrm_stmt_error(istmt, "Error: Could not close select statement", __FILE__, __LINE__);
}
}
if (ustmt) {
if (mysql_stmt_close(ustmt)) {
tbrm_stmt_error(ustmt, "Error: Could not close select statement", __FILE__, __LINE__);
}
}
if (con) {
mysql_close(con);
}
return false;
}
} // table_replication_metadata
} // mysql

View File

@ -29,7 +29,7 @@ namespace mysql {
namespace table_replication_metadata {
/* Structure definition for table replication oconsistency metadata */
/* Structure definition for table replication consistency metadata */
typedef struct {
unsigned char* db_table; /* Fully qualified db.table name,
primary key. */
@ -40,13 +40,30 @@ typedef struct {
bool gtid_known; /* Is gtid known ? */
} tbr_metadata_t;
/* Structure definition for table replication server metadata */
typedef struct {
boost::uint32_t server_id; /* Server id, primary key*/
boost::uint64_t binlog_pos; /* Last executed binlog position */
unsigned char* gtid; /* Last executed global transaction
id if known */
boost::uint32_t gtid_len; /* Actual length of gtid */
bool gtid_known; /* 1 if gtid known, 0 if not */
boost::uint32_t server_type; /* server type */
} tbr_server_t;
// Not really nice, but currently we support only these two
// server types.
#define TRC_SERVER_TYPE_MARIADB = 1,
#define TRC_SERVER_TYPE_MYSQL = 2
/***********************************************************************//**
Read table replication consistency metadata from the MySQL master server.
This function assumes that necessary database and table are created.
@return false if read failed, true if read succeeded */
bool
tbrm_read_metadata(
/*===============*/
tbrm_read_consistency_metadata(
/*===========================*/
const char *master_host, /*!< in: Master hostname */
const char *user, /*!< in: username */
const char *passwd, /*!< in: password */
@ -55,13 +72,28 @@ tbrm_read_metadata(
metadata. */
size_t *tbrm_rows); /*!< out: number of rows read */
/***********************************************************************//**
Read table replication server metadata from the MySQL master server.
This function assumes that necessary database and table are created.
@return false if read failed, true if read succeeded */
bool
tbrm_read_server_metadata(
/*======================*/
const char *master_host, /*!< in: Master hostname */
const char *user, /*!< in: username */
const char *passwd, /*!< in: password */
unsigned int master_port, /*!< in: master port */
tbr_server_t **tbrm_server, /*!< out: table replication server
metadata. */
size_t *tbrm_rows); /*!< out: number of rows read */
/***********************************************************************//**
Write table replication consistency metadata from the MySQL master server.
This function assumes that necessary database and table are created.
@return false if read failed, true if read succeeded */
bool
tbrm_write_metadata(
/*================*/
tbrm_write_consistency_metadata(
/*============================*/
const char *master_host, /*!< in: Master hostname */
const char *user, /*!< in: username */
const char *passwd, /*!< in: password */
@ -70,6 +102,22 @@ tbrm_write_metadata(
metadata. */
size_t tbrm_rows); /*!< in: number of rows read */
/***********************************************************************//**
Write table replication server metadata from the MySQL master server.
This function assumes that necessary database and table are created.
@return false if read failed, true if read succeeded */
bool
tbrm_write_server_metadata(
/*=======================*/
const char *master_host, /*!< in: Master hostname */
const char *user, /*!< in: username */
const char *passwd, /*!< in: password */
unsigned int master_port, /*!< in: master port */
tbr_server_t **tbrm_server, /*!< out: table replication server
metadata. */
size_t *tbrm_rows); /*!< out: number of rows read */
} // table_replication_metadata
} // mysql

View File

@ -7,8 +7,6 @@ LINK_DIRECTORIES(${Boost_LIBRARY_DIRS})
INCLUDE_DIRECTORIES(${Boost_INCLUDE_DIR})
# Find MySQL client library and header files
find_library(MySQL_LIBRARY NAMES libmysqld.a PATHS
/usr/lib64/mysql /usr/lib/mysql /usr/local/mysql/lib)
find_path(MySQL_INCLUDE_DIR mysql.h
/usr/local/include/mysql /usr/include/mysql)
include_directories(${MySQL_INCLUDE_DIR})
@ -24,11 +22,11 @@ include_directories(${TRC_INCLUDE_DIR})
# Build rule for example
foreach(prog Example)
ADD_EXECUTABLE(${prog} ${prog}.c ../../utils/skygw_utils.o )
TARGET_LINK_LIBRARIES(${prog} table_replication_consistency.a replication boost_system pthread stdc++ ${MySQL_LIBRARY} crypt aio log_manager)
ADD_EXECUTABLE(${prog} ${prog}.c ../../utils/skygw_utils.o /usr/local/mysql/lib/libmysqld.a)
TARGET_LINK_LIBRARIES(${prog} table_replication_consistency.a replication boost_system boost_thread pthread stdc++ crypt aio log_manager)
endforeach()
foreach(prog test)
ADD_EXECUTABLE(${prog} ${prog}.cpp ../../utils/skygw_utils.o )
TARGET_LINK_LIBRARIES(${prog} table_replication_consistency.a replication boost_system pthread stdc++ ${MySQL_LIBRARY} crypt aio log_manager)
ADD_EXECUTABLE(${prog} ${prog}.cpp ../../utils/skygw_utils.o /usr/local/mysql/lib/libmysqld.a)
TARGET_LINK_LIBRARIES(${prog} table_replication_consistency.a replication boost_system boost_thread pthread stdc++ crypt aio log_manager)
endforeach()