Added servers metadata reading and writing.

This commit is contained in:
Jan Lindström
2013-07-26 13:46:31 +03:00
parent b3b444880d
commit 4a4383f6aa
8 changed files with 498 additions and 113 deletions

View File

@ -154,7 +154,6 @@ tb_replication_consistency_init(
} }
// Start actual replication listener // Start actual replication listener
/*
err = pthread_create( err = pthread_create(
&replication_listener_tid[i], &replication_listener_tid[i],
NULL, NULL,
@ -165,7 +164,6 @@ tb_replication_consistency_init(
errmsg = string(strerror(err)); errmsg = string(strerror(err));
goto error_handling; goto error_handling;
} }
*/
} }
// Replication listener will use this exception for // Replication listener will use this exception for
@ -345,7 +343,7 @@ err_exit:
/***********************************************************************//** /***********************************************************************//**
This function is to shutdown the replication listener and free all This function is to shutdown the replication listener and free all
resources on table consistency. This function (TODO) will store resources on table consistency. This function will store
the current status on metadata to MySQL server. the current status on metadata to MySQL server.
@return 0 on success, error code at failure. */ @return 0 on success, error code at failure. */
int int

View File

@ -153,7 +153,7 @@ tb_replication_consistency_reconnect(
/***********************************************************************//** /***********************************************************************//**
This function is to shutdown the replication listener and free all This function is to shutdown the replication listener and free all
resources on table consistency. This function (TODO) will store resources on table consistency. This function will store
the current status on metadata to MySQL server. the current status on metadata to MySQL server.
@return 0 on success, error code at failure. */ @return 0 on success, error code at failure. */
int int

View File

@ -156,17 +156,26 @@ tbrl_update_consistency(
// Need to be protected by mutex to avoid concurrency problems // Need to be protected by mutex to avoid concurrency problems
boost::mutex::scoped_lock lock(table_consistency_mutex); boost::mutex::scoped_lock lock(table_consistency_mutex);
if(table_consistency_map.find(database_dot_table) == table_consistency_map.end()) { multimap<std::string, tbr_metadata_t*>::iterator key = table_consistency_map.find(database_dot_table);
if( key == table_consistency_map.end()) {
not_found = true; not_found = true;
} else { } else {
// Loop through the consistency values // Loop through the consistency values
for(multimap<std::string, tbr_metadata_t*>::iterator i = table_consistency_map.find(database_dot_table); for(multimap<std::string, tbr_metadata_t*>::iterator i = key;
i != table_consistency_map.end(); ++i) { i != table_consistency_map.end(); ++i) {
tc = (*i).second; tc = (*i).second;
if (tc->server_id == lheader->server_id) { if (tc->server_id == lheader->server_id) {
not_found = false; not_found = false;
break; break;
} }
// If the next table name is not anymore the same,
// we can safely exit from the loop, names are ordered
if (strcpy((char *)tc->db_table, (char *)database_dot_table.c_str()) != 0) {
not_found = true;
break;
}
} }
} }
@ -219,9 +228,13 @@ tbrl_update_server_status(
// Need to be protected by mutex to avoid concurrency problems // Need to be protected by mutex to avoid concurrency problems
boost::mutex::scoped_lock lock(table_servers_mutex); boost::mutex::scoped_lock lock(table_servers_mutex);
if(table_replication_servers.find(lheader->server_id) == table_replication_servers.end()) { // Try to find out the servre metadata
map<uint32_t, tbr_server_t*>::iterator key = table_replication_servers.find(lheader->server_id);
if( key == table_replication_servers.end()) {
not_found = true; not_found = true;
} else { } else {
ts = (*key).second;
not_found = false; not_found = false;
} }
@ -446,6 +459,20 @@ void* tb_replication_listener_reader(
database_dot_table= tb_it->second; database_dot_table= tb_it->second;
} }
if (tbr_debug) {
skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Debug: Thread %ld Server %d Binlog_pos %lu event %d"
" : %s dbtable '%s' id %d",
id,
lheader->server_id,
lheader->next_position,
revent->get_event_type(),
get_event_type_str(revent->get_event_type()),
database_dot_table.c_str(),
revent->table_id);
}
// Update the consistency information // Update the consistency information
tbrl_update_consistency(lheader, database_dot_table, gtid_known, gtid); tbrl_update_consistency(lheader, database_dot_table, gtid_known, gtid);
@ -503,7 +530,7 @@ void* tb_replication_listener_reader(
/***********************************************************************//** /***********************************************************************//**
This function is to shutdown the replication listener and free all This function is to shutdown the replication listener and free all
resources on table consistency. This function (TODO) will store resources on table consistency. This function will store
the current status on metadata to MySQL server. the current status on metadata to MySQL server.
@return 0 on success, error code at failure. */ @return 0 on success, error code at failure. */
int int
@ -752,6 +779,7 @@ void
{ {
master = (replication_listener_t*)arg; master = (replication_listener_t*)arg;
tbr_metadata_t **tm=NULL; tbr_metadata_t **tm=NULL;
tbr_server_t **ts=NULL;
bool err = false; bool err = false;
// Set up master connect info // Set up master connect info
@ -761,35 +789,83 @@ void
sleep(10); // Sleep ~10 seconds sleep(10); // Sleep ~10 seconds
try { try {
// Need to be protected by mutex to avoid concurrency problems size_t nelems;
boost::mutex::scoped_lock lock(table_consistency_mutex);
size_t nelems = table_consistency_map.size(); // This scope for scoped mutexing
size_t k =0; {
// Need to be protected by mutex to avoid concurrency problems
boost::mutex::scoped_lock lock(table_consistency_mutex);
tm = (tbr_metadata_t**)calloc(nelems, sizeof(tbr_metadata_t*)); nelems = table_consistency_map.size();
size_t k =0;
for(multimap<std::string, tbr_metadata_t*>::iterator i = table_consistency_map.begin(); tm = (tbr_metadata_t**)calloc(nelems, sizeof(tbr_metadata_t*));
i != table_consistency_map.end(); ++i,++k) {
tm[k] = ((*i).second); if (!tm) {
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)"Error: TRM: Out of memory");
goto my_exit;
}
for(multimap<std::string, tbr_metadata_t*>::iterator i = table_consistency_map.begin();
i != table_consistency_map.end(); ++i,++k) {
tm[k] = ((*i).second);
}
} }
// Release mutex
lock.unlock();
// Insert or update metadata information // Insert or update metadata information
err = tbrm_write_consistency_metadata( if (!tbrm_write_consistency_metadata(
(const char *)master_host, (const char *)master_host,
(const char *)master_user, (const char *)master_user,
(const char *)master_passwd, (const char *)master_passwd,
master_port, master_port,
tm, tm,
nelems); nelems)) {
goto my_exit;
}
free(tm); free(tm);
tm = NULL;
// This scope for scoped mutexing
{
// Need to be protected by mutex to avoid
// concurrency problems
boost::mutex::scoped_lock lock(table_servers_mutex);
nelems = table_replication_servers.size();
size_t k =0;
ts = (tbr_server_t**)calloc(nelems, sizeof(tbr_server_t*));
if (!ts) {
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)"Error: TRM: Out of memory");
goto my_exit;
}
for(map<boost::uint32_t, tbr_server_t*>::iterator i = table_replication_servers.begin();
i != table_replication_servers.end(); ++i,++k) {
ts[k] = ((*i).second);
}
}
// Insert or update metadata information
if (!tbrm_write_server_metadata(
(const char *)master_host,
(const char *)master_user,
(const char *)master_passwd,
master_port,
ts,
nelems)) {
goto my_exit;
}
free(ts);
ts = NULL;
} }
catch(ListenerException e) catch(ListenerException e)
{ {
@ -820,6 +896,15 @@ void
} }
my_exit: my_exit:
if (tm) {
free(tm);
}
if (ts) {
free(ts);
}
if (tbr_trace) { if (tbr_trace) {
skygw_log_write_flush(NULL, LOGFILE_TRACE, (char *)"Shutting down the metadata updater thread"); skygw_log_write_flush(NULL, LOGFILE_TRACE, (char *)"Shutting down the metadata updater thread");
} }
@ -840,6 +925,7 @@ tb_replication_listener_init(
char **error_message) /*!< out: error message */ char **error_message) /*!< out: error message */
{ {
tbr_metadata_t *tm = NULL; tbr_metadata_t *tm = NULL;
tbr_server_t *ts=NULL;
size_t tm_rows = 0; size_t tm_rows = 0;
std::string dbtable; std::string dbtable;
std::string err; std::string err;
@ -856,7 +942,7 @@ tb_replication_listener_init(
(unsigned int)master_port, (unsigned int)master_port,
&tm, &tm,
&tm_rows)) { &tm_rows)) {
err = std::string("Error: reading metadata failed"); err = std::string("Error: reading table consistency metadata failed");
goto error_exit; goto error_exit;
} }
@ -866,6 +952,24 @@ tb_replication_listener_init(
table_consistency_map.insert(pair<std::string, tbr_metadata_t*>(dbtable, t)); table_consistency_map.insert(pair<std::string, tbr_metadata_t*>(dbtable, t));
} }
if (!tbrm_read_server_metadata(
(const char *)master_host,
(const char *)master_user,
(const char *)master_passwd,
(unsigned int)master_port,
&ts,
&tm_rows)) {
err = std::string("Error: reading table servers metadata failed");
goto error_exit;
}
for(size_t i=0;i < tm_rows; i++) {
tbr_server_t *t = &(ts[i]);
table_replication_servers.insert(pair<uint32_t, tbr_server_t*>(t->server_id, t));
}
} }
catch(ListenerException e) catch(ListenerException e)
{ {
@ -912,32 +1016,41 @@ tb_replication_listener_done(
char **error_message) /*!< out: error message */ char **error_message) /*!< out: error message */
{ {
size_t nelems = table_consistency_map.size(); size_t nelems = table_consistency_map.size();
size_t nelems2 = table_replication_servers.size();
size_t k =0; size_t k =0;
tbr_metadata_t **tm=NULL; tbr_metadata_t **tm=NULL;
int err = 0; tbr_server_t **ts=NULL;
bool err = false;
tm = (tbr_metadata_t**)calloc(nelems, sizeof(tbr_metadata_t*)); tm = (tbr_metadata_t**)calloc(nelems, sizeof(tbr_metadata_t*));
ts = (tbr_server_t **)calloc(nelems2, sizeof(tbr_server_t*));
if (tm == NULL || ts == NULL) {
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)"TRM: Out of memory");
goto error_exit;
}
try { try {
k = 0;
for(multimap<std::string, tbr_metadata_t*>::iterator i = table_consistency_map.begin(); for(multimap<std::string, tbr_metadata_t*>::iterator i = table_consistency_map.begin();
i != table_consistency_map.end(); ++i,++k) { i != table_consistency_map.end(); ++i,++k) {
tm[k] = ((*i).second); tm[k] = ((*i).second);
} }
// Insert or update metadata information // Insert or update table consistency metadata information
err = tbrm_write_consistency_metadata( if (!tbrm_write_consistency_metadata(
(const char *)master_host, (const char *)master_host,
(const char *)master_user, (const char *)master_user,
(const char *)master_passwd, (const char *)master_passwd,
(unsigned int)master_port, (unsigned int)master_port,
tm, tm,
(size_t)nelems); nelems)) {
goto error_exit;
free(tm); }
// Clean up memory allocation for multimap items // Clean up memory allocation for multimap items
for(multimap<std::string, tbr_metadata_t*>::iterator i = table_consistency_map.begin(); for(multimap<std::string, tbr_metadata_t*>::iterator i = table_consistency_map.begin();
i != table_consistency_map.end(); ++i,++k) { i != table_consistency_map.end(); ++i) {
tbr_metadata_t *trm = ((*i).second); tbr_metadata_t *trm = ((*i).second);
free(trm->db_table); free(trm->db_table);
@ -947,6 +1060,34 @@ tb_replication_listener_done(
free(trm); free(trm);
} }
k=0;
for(map<uint32_t, tbr_server_t*>::iterator i = table_replication_servers.begin();
i != table_replication_servers.end(); ++i,++k) {
ts[k] = ((*i).second);
}
// Insert or update table server metadata information
if (!tbrm_write_server_metadata(
(const char *)master_host,
(const char *)master_user,
(const char *)master_passwd,
(unsigned int)master_port,
ts,
nelems2)) {
goto error_exit;
}
// Clean up memory allocation for multimap items
for(map<uint32_t, tbr_server_t*>::iterator j = table_replication_servers.begin();
j != table_replication_servers.end(); ++j) {
tbr_server_t *trs = ((*j).second);
free(trs->gtid);
table_replication_servers.erase(j);
free(trs);
}
// Clean up binlog listeners // Clean up binlog listeners
table_replication_listeners.erase(table_replication_listeners.begin(), table_replication_listeners.end()); table_replication_listeners.erase(table_replication_listeners.begin(), table_replication_listeners.end());
} }
@ -954,27 +1095,45 @@ tb_replication_listener_done(
{ {
string err = std::string("Listener exception: ")+ e.what(); string err = std::string("Listener exception: ")+ e.what();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
goto error_exit;
} }
catch(boost::system::error_code e) catch(boost::system::error_code e)
{ {
string err = std::string("Listener system exception: ")+ e.message(); string err = std::string("Listener system exception: ")+ e.message();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
goto error_exit;
} }
// Try and catch all exceptions // Try and catch all exceptions
catch(std::exception const& e) catch(std::exception const& e)
{ {
string err = std::string("Listener other exception: ")+ e.what(); string err = std::string("Listener other exception: ")+ e.what();
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
goto error_exit;
} }
// Rest of them // Rest of them
catch(...) catch(...)
{ {
string err = std::string("Unknown exception: "); string err = std::string("Unknown exception: ");
skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str());
goto error_exit;
} }
if (tbr_trace) { if (tbr_trace) {
skygw_log_write_flush(NULL, LOGFILE_TRACE, (char *)"Shutting down the listeners"); skygw_log_write_flush(NULL, LOGFILE_TRACE, (char *)"Shutting down the listeners");
goto error_exit;
}
free(tm);
free(ts);
return err;
error_exit:
if (tm) {
free(tm);
}
if (ts) {
free(ts);
} }
return err; return err;

View File

@ -44,14 +44,15 @@ namespace mysql {
namespace table_replication_metadata { namespace table_replication_metadata {
/***********************************************************************//** /***********************************************************************//**
Internal function to write error messages to the log file.
*/ */
static void static void
tbrm_report_error( tbrm_report_error(
/*==============*/ /*==============*/
MYSQL *con, MYSQL *con, /*!< in: MySQL connection */
const char *message, const char *message, /*!< in: Error message */
const char *file, const char *file, /*!< in: File name */
int line) int line) /*!< in: Line number */
{ {
skygw_log_write_flush(NULL, LOGFILE_ERROR, skygw_log_write_flush(NULL, LOGFILE_ERROR,
(char *)"%s at file %s line %d", message, file, line); (char *)"%s at file %s line %d", message, file, line);
@ -63,14 +64,15 @@ tbrm_report_error(
} }
/***********************************************************************//** /***********************************************************************//**
Internal function to write statement error messages to the log file.
*/ */
static void static void
tbrm_stmt_error( tbrm_stmt_error(
/*============*/ /*============*/
MYSQL_STMT *stmt, MYSQL_STMT *stmt, /*!< in: MySQL statement */
const char *message, const char *message, /*!< in: Error message */
const char *file, const char *file, /*!< in: File name */
int line) int line) /*!< in: Line number */
{ {
skygw_log_write_flush(NULL, LOGFILE_ERROR, skygw_log_write_flush(NULL, LOGFILE_ERROR,
(char *)"%s at file %s line %d", message, file, line); (char *)"%s at file %s line %d", message, file, line);
@ -93,10 +95,10 @@ create succeeded */
static bool static bool
tbrm_create_metadata( tbrm_create_metadata(
/*=================*/ /*=================*/
const char *master_host, const char *master_host, /*!< in: Master host name */
const char *user, const char *user, /*!< in: Username */
const char *passwd, const char *passwd, /*!< in: Passwd */
unsigned int master_port) unsigned int master_port) /*!< in: Master port */
{ {
MYSQL *con = mysql_init(NULL); MYSQL *con = mysql_init(NULL);
unsigned int myerrno=0; unsigned int myerrno=0;
@ -215,19 +217,21 @@ tbrm_read_consistency_metadata(
unsigned int master_port, /*!< in: master port */ unsigned int master_port, /*!< in: master port */
tbr_metadata_t **tbrm_meta, /*!< out: table replication consistency tbr_metadata_t **tbrm_meta, /*!< out: table replication consistency
metadata. */ metadata. */
size_t *tbrm_rows) /*!< out: number of rows read */ size_t *tbrm_rows) /*!< out: number of rows read */
{ {
unsigned int myerrno=0; unsigned int myerrno=0;
boost::uint64_t nrows=0; boost::uint64_t nrows=0;
boost::uint64_t i=0; boost::uint64_t i=0;
MYSQL_RES *result = NULL; MYSQL_RES *result = NULL;
tbr_metadata_t *tm=NULL;
tbrm_create_metadata(master_host, user, passwd, master_port); tbrm_create_metadata(master_host, user, passwd, master_port);
MYSQL *con = mysql_init(NULL); MYSQL *con = mysql_init(NULL);
if (!con) { if (!con) {
// TODO: start to log error and other messages skygw_log_write_flush(NULL, LOGFILE_ERROR,
(char *)"Error: MySQL init failed");
return false; return false;
} }
@ -264,25 +268,49 @@ tbrm_read_consistency_metadata(
nrows = mysql_num_rows(result); nrows = mysql_num_rows(result);
*tbrm_meta = (tbr_metadata_t*) calloc(nrows, sizeof(tbr_metadata_t)); tm = (tbr_metadata_t*) malloc(nrows * sizeof(tbr_metadata_t));
if (!tm) {
skygw_log_write_flush(NULL, LOGFILE_ERROR,
(char *)"Error: Out of memory");
goto error_exit;
}
memset(tm, 0, nrows * sizeof(tbr_metadata_t));
*tbrm_rows = nrows; *tbrm_rows = nrows;
*tbrm_meta = tm;
for(i=0;i < nrows; i++) { for(i=0;i < nrows; i++) {
MYSQL_ROW row = mysql_fetch_row(result); MYSQL_ROW row = mysql_fetch_row(result);
unsigned long *lengths = mysql_fetch_lengths(result); unsigned long *lengths = mysql_fetch_lengths(result);
// DB_TABLE_NAME // DB_TABLE_NAME
tbrm_meta[i]->db_table = (unsigned char *)malloc(lengths[0]); tm[i].db_table = (unsigned char *)malloc(lengths[0]);
strcpy((char *)tbrm_meta[i]->db_table, row[0]);
if (!tm[i].db_table) {
skygw_log_write_flush(NULL, LOGFILE_ERROR,
(char *)"Error: Out of memory");
goto error_exit;
}
strcpy((char *)tm[i].db_table, row[0]);
// SERVER_ID // SERVER_ID
tbrm_meta[i]->server_id = atol(row[1]); tm[i].server_id = atol(row[1]);
// GTID // GTID
tbrm_meta[i]->gtid = (unsigned char *)malloc((lengths[2])*sizeof(unsigned char)); tm[i].gtid = (unsigned char *)malloc((lengths[2])*sizeof(unsigned char));
memcpy(tbrm_meta[i]->gtid, row[2], lengths[2]);
tbrm_meta[i]->gtid_len = lengths[2]; if (!tm[i].gtid) {
free(tm[i].db_table);
skygw_log_write_flush(NULL, LOGFILE_ERROR,
(char *)"Error: Out of memory");
goto error_exit;
}
memcpy(tm[i].gtid, row[2], lengths[2]);
tm[i].gtid_len = lengths[2];
// BINLOG_POS // BINLOG_POS
tbrm_meta[i]->binlog_pos = atoll(row[3]); tm[i].binlog_pos = atoll(row[3]);
// GTID_KNOWN // GTID_KNOWN
tbrm_meta[i]->gtid_known = atol(row[4]); tm[i].gtid_known = atol(row[4]);
} }
mysql_free_result(result); mysql_free_result(result);
@ -292,6 +320,16 @@ tbrm_read_consistency_metadata(
error_exit: error_exit:
if (tm) {
for(size_t k=0;i < i; k++) {
free(tm[k].db_table);
free(tm[k].gtid);
}
free(tm);
*tbrm_rows = 0;
*tbrm_meta = NULL;
}
if (result) { if (result) {
mysql_free_result(result); mysql_free_result(result);
} }
@ -316,7 +354,7 @@ tbrm_write_consistency_metadata(
unsigned int master_port, /*!< in: master port */ unsigned int master_port, /*!< in: master port */
tbr_metadata_t **tbrm_meta, /*!< in: table replication consistency tbr_metadata_t **tbrm_meta, /*!< in: table replication consistency
metadata. */ metadata. */
size_t tbrm_rows) /*!< in: number of rows read */ size_t tbrm_rows) /*!< in: number of rows read */
{ {
int myerrno=0; int myerrno=0;
boost::uint32_t i; boost::uint32_t i;
@ -327,15 +365,15 @@ tbrm_write_consistency_metadata(
MYSQL_BIND iparam[5]; MYSQL_BIND iparam[5];
MYSQL_BIND uparam[5]; MYSQL_BIND uparam[5];
MYSQL_BIND result[1]; MYSQL_BIND result[1];
char *dbtable; char *dbtable=NULL;
void *gtid; void *gtid=NULL;
int gtidknown; int gtidknown;
int serverid; int serverid;
boost::uint64_t binlogpos; boost::uint64_t binlogpos;
// Query to find out if the row already exists on table // Query to find out if the row already exists on table
const char *sst = "SELECT BINLOG_POS FROM TABLE_REPLICATION_CONSISTENCY WHERE" const char *sst = "SELECT BINLOG_POS FROM TABLE_REPLICATION_CONSISTENCY WHERE"
" DB_TABLE_NAME='?' and SERVER_ID=?"; " DB_TABLE_NAME=? and SERVER_ID=?";
// Insert Query // Insert Query
const char *ist = "INSERT INTO TABLE_REPLICATION_CONSISTENCY(DB_TABLE_NAME," const char *ist = "INSERT INTO TABLE_REPLICATION_CONSISTENCY(DB_TABLE_NAME,"
@ -350,7 +388,8 @@ tbrm_write_consistency_metadata(
MYSQL *con = mysql_init(NULL); MYSQL *con = mysql_init(NULL);
if (!con) { if (!con) {
// TODO: start to log error and other messages skygw_log_write_flush(NULL, LOGFILE_ERROR,
(char *)"Mysql init failed", mysql_error(con));
return false; return false;
} }
@ -402,29 +441,24 @@ tbrm_write_consistency_metadata(
// Init param structure // Init param structure
// Select // Select
sparam[0].buffer_type = MYSQL_TYPE_VARCHAR; sparam[0].buffer_type = MYSQL_TYPE_VARCHAR;
sparam[0].buffer = (void *) dbtable;
sparam[1].buffer_type = MYSQL_TYPE_LONG; sparam[1].buffer_type = MYSQL_TYPE_LONG;
sparam[1].buffer = (void *) &serverid; sparam[1].buffer = (void *) &serverid;
// Insert // Insert
iparam[0].buffer_type = MYSQL_TYPE_VARCHAR; iparam[0].buffer_type = MYSQL_TYPE_VARCHAR;
iparam[0].buffer = (void *) dbtable;
iparam[1].buffer_type = MYSQL_TYPE_LONG; iparam[1].buffer_type = MYSQL_TYPE_LONG;
iparam[1].buffer = (void *) &serverid; iparam[1].buffer = (void *) &serverid;
iparam[2].buffer_type = MYSQL_TYPE_BLOB; iparam[2].buffer_type = MYSQL_TYPE_BLOB;
iparam[2].buffer = (void *) gtid;
iparam[3].buffer_type = MYSQL_TYPE_LONGLONG; iparam[3].buffer_type = MYSQL_TYPE_LONGLONG;
iparam[3].buffer = (void *) &binlogpos; iparam[3].buffer = (void *) &binlogpos;
iparam[4].buffer_type = MYSQL_TYPE_SHORT; iparam[4].buffer_type = MYSQL_TYPE_SHORT;
iparam[4].buffer = (void *) &gtidknown; iparam[4].buffer = (void *) &gtidknown;
// Update // Update
uparam[0].buffer_type = MYSQL_TYPE_BLOB; uparam[0].buffer_type = MYSQL_TYPE_BLOB;
uparam[0].buffer = (void *) gtid;
uparam[1].buffer_type = MYSQL_TYPE_LONGLONG; uparam[1].buffer_type = MYSQL_TYPE_LONGLONG;
uparam[1].buffer = (void *) &binlogpos; uparam[1].buffer = (void *) &binlogpos;
uparam[2].buffer_type = MYSQL_TYPE_SHORT; uparam[2].buffer_type = MYSQL_TYPE_SHORT;
uparam[2].buffer = (void *) &gtidknown; uparam[2].buffer = (void *) &gtidknown;
uparam[3].buffer_type = MYSQL_TYPE_VARCHAR; uparam[3].buffer_type = MYSQL_TYPE_VARCHAR;
uparam[3].buffer = (void *) dbtable;
uparam[4].buffer_type = MYSQL_TYPE_LONG; uparam[4].buffer_type = MYSQL_TYPE_LONG;
uparam[4].buffer = (void *) &serverid; uparam[4].buffer = (void *) &serverid;
// Result set for select // Result set for select
@ -442,7 +476,12 @@ tbrm_write_consistency_metadata(
gtid = (char *)tbrm_meta[i]->gtid; gtid = (char *)tbrm_meta[i]->gtid;
gtidknown = tbrm_meta[i]->gtid_known; gtidknown = tbrm_meta[i]->gtid_known;
serverid = tbrm_meta[i]->server_id; serverid = tbrm_meta[i]->server_id;
uparam[3].buffer = (void *) dbtable;
sparam[0].buffer = (void *) dbtable;
uparam[0].buffer = (void *) gtid;
iparam[0].buffer = (void *) dbtable;
iparam[2].buffer = (void *) gtid;
sparam[0].buffer_length = strlen(dbtable); sparam[0].buffer_length = strlen(dbtable);
uparam[3].buffer_length = sparam[0].buffer_length; uparam[3].buffer_length = sparam[0].buffer_length;
iparam[0].buffer_length = sparam[0].buffer_length; iparam[0].buffer_length = sparam[0].buffer_length;
@ -475,32 +514,36 @@ tbrm_write_consistency_metadata(
// Fetch result // Fetch result
myerrno = mysql_stmt_fetch(sstmt); myerrno = mysql_stmt_fetch(sstmt);
if (myerrno == 1) { if (myerrno != 0 && myerrno != MYSQL_NO_DATA) {
tbrm_stmt_error(sstmt, "Error: Could not fetch result set", __FILE__, __LINE__); tbrm_stmt_error(sstmt, "Error: Could not fetch result set", __FILE__, __LINE__);
goto error_exit; goto error_exit;
} }
// If fetch returned 0, it means that this table, serverid // If fetch returned 0 rows, it means that this table, serverid
// pair was found from metadata, we might need to update // pair was found from metadata, we might need to update
// the consistency information. // the consistency information.
if (myerrno == 0 && binlogpos != tbrm_meta[i]->binlog_pos) { if (myerrno == 0) {
// Update the consistency information // We update the consistency if and only if the
binlogpos = tbrm_meta[i]->binlog_pos; // binlog position for this table has changed
if (binlogpos != tbrm_meta[i]->binlog_pos) {
// Update the consistency information
binlogpos = tbrm_meta[i]->binlog_pos;
// Bind param structure to statement // Bind param structure to statement
if (mysql_stmt_bind_param(ustmt, uparam) != 0) { if (mysql_stmt_bind_param(ustmt, uparam) != 0) {
tbrm_stmt_error(ustmt, "Error: Could not bind update parameters", __FILE__, __LINE__); tbrm_stmt_error(ustmt, "Error: Could not bind update parameters", __FILE__, __LINE__);
goto error_exit; goto error_exit;
} }
// Execute!! // Execute!!
if (mysql_stmt_execute(ustmt) != 0) { if (mysql_stmt_execute(ustmt) != 0) {
tbrm_stmt_error(ustmt, "Error: Could not execute update statement", __FILE__, __LINE__); tbrm_stmt_error(ustmt, "Error: Could not execute update statement", __FILE__, __LINE__);
goto error_exit; goto error_exit;
} }
if (tbr_debug) { if (tbr_debug) {
skygw_log_write_flush(NULL, LOGFILE_TRACE, skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Debug: Metadata state updated for %s in server %d is binlog_pos %lu gtid '%s'", (char *)"TRC Debug: Metadata state updated for %s in server %d is binlog_pos %lu gtid '%s'",
dbtable, serverid, binlogpos, gtid); dbtable, serverid, binlogpos, gtid);
}
} }
} else { } else {
@ -576,13 +619,16 @@ tbrm_read_server_metadata(
boost::uint64_t nrows=0; boost::uint64_t nrows=0;
boost::uint64_t i=0; boost::uint64_t i=0;
MYSQL_RES *result = NULL; MYSQL_RES *result = NULL;
tbr_server_t *ts=NULL;
tbrm_create_metadata(master_host, user, passwd, master_port); tbrm_create_metadata(master_host, user, passwd, master_port);
MYSQL *con = mysql_init(NULL); MYSQL *con = mysql_init(NULL);
if (!con) { if (!con) {
// TODO: start to log error and other messages skygw_log_write_flush(NULL, LOGFILE_ERROR,
(char *)"Mysql init failed", mysql_error(con));
return false; return false;
} }
@ -619,24 +665,39 @@ tbrm_read_server_metadata(
nrows = mysql_num_rows(result); nrows = mysql_num_rows(result);
*tbrm_servers = (tbr_server_t*) calloc(nrows, sizeof(tbr_server_t)); ts = (tbr_server_t*) malloc(nrows * sizeof(tbr_server_t));
if(!ts) {
skygw_log_write_flush(NULL, LOGFILE_ERROR,
(char *)"Error: Out of memory");
goto error_exit;
}
*tbrm_rows = nrows; *tbrm_rows = nrows;
*tbrm_servers = ts;
for(i=0;i < nrows; i++) { for(i=0;i < nrows; i++) {
MYSQL_ROW row = mysql_fetch_row(result); MYSQL_ROW row = mysql_fetch_row(result);
unsigned long *lengths = mysql_fetch_lengths(result); unsigned long *lengths = mysql_fetch_lengths(result);
// SERVER_ID // SERVER_ID
tbrm_servers[i]->server_id = atol(row[0]); ts[i].server_id = atol(row[0]);
// BINLOG_POS // BINLOG_POS
tbrm_servers[i]->binlog_pos = atoll(row[1]); ts[i].binlog_pos = atoll(row[1]);
// GTID // GTID
tbrm_servers[i]->gtid = (unsigned char *)malloc((lengths[2])*sizeof(unsigned char)); ts[i].gtid = (unsigned char *)malloc((lengths[2])*sizeof(unsigned char));
memcpy(tbrm_servers[i]->gtid, row[2], lengths[2]);
tbrm_servers[i]->gtid_len = lengths[2]; if (!ts[i].gtid) {
skygw_log_write_flush(NULL, LOGFILE_ERROR,
(char *)"Error: Out of memory");
goto error_exit;
}
memcpy(ts[i].gtid, row[2], lengths[2]);
ts[i].gtid_len = lengths[2];
// GTID_KNOWN // GTID_KNOWN
tbrm_servers[i]->gtid_known = atol(row[3]); ts[i].gtid_known = atol(row[3]);
// SERVER_TYPE // SERVER_TYPE
tbrm_servers[i]->server_type = atol(row[4]); ts[i].server_type = atol(row[4]);
} }
mysql_free_result(result); mysql_free_result(result);
@ -645,6 +706,14 @@ tbrm_read_server_metadata(
return true; return true;
error_exit: error_exit:
if (ts) {
for(size_t k=0;i < i; k++) {
free(ts[k].gtid);
}
free(ts);
*tbrm_rows = 0;
*tbrm_servers = NULL;
}
if (result) { if (result) {
mysql_free_result(result); mysql_free_result(result);
@ -705,7 +774,8 @@ tbrm_write_server_metadata(
MYSQL *con = mysql_init(NULL); MYSQL *con = mysql_init(NULL);
if (!con) { if (!con) {
// TODO: start to log error and other messages skygw_log_write_flush(NULL, LOGFILE_ERROR,
(char *)"Mysql init failed", mysql_error(con));
return false; return false;
} }
@ -762,7 +832,6 @@ tbrm_write_server_metadata(
iparam[0].buffer_type = MYSQL_TYPE_LONG; iparam[0].buffer_type = MYSQL_TYPE_LONG;
iparam[0].buffer = (void *) &serverid; iparam[0].buffer = (void *) &serverid;
iparam[1].buffer_type = MYSQL_TYPE_BLOB; iparam[1].buffer_type = MYSQL_TYPE_BLOB;
iparam[1].buffer = (void *) gtid;
iparam[2].buffer_type = MYSQL_TYPE_LONGLONG; iparam[2].buffer_type = MYSQL_TYPE_LONGLONG;
iparam[2].buffer = (void *) &binlogpos; iparam[2].buffer = (void *) &binlogpos;
iparam[3].buffer_type = MYSQL_TYPE_SHORT; iparam[3].buffer_type = MYSQL_TYPE_SHORT;
@ -771,7 +840,6 @@ tbrm_write_server_metadata(
iparam[4].buffer = (void *) &servertype; iparam[4].buffer = (void *) &servertype;
// Update // Update
uparam[0].buffer_type = MYSQL_TYPE_BLOB; uparam[0].buffer_type = MYSQL_TYPE_BLOB;
uparam[0].buffer = (void *) gtid;
uparam[1].buffer_type = MYSQL_TYPE_LONGLONG; uparam[1].buffer_type = MYSQL_TYPE_LONGLONG;
uparam[1].buffer = (void *) &binlogpos; uparam[1].buffer = (void *) &binlogpos;
uparam[2].buffer_type = MYSQL_TYPE_SHORT; uparam[2].buffer_type = MYSQL_TYPE_SHORT;
@ -794,6 +862,8 @@ tbrm_write_server_metadata(
serverid = tbrm_servers[i]->server_id; serverid = tbrm_servers[i]->server_id;
servertype = tbrm_servers[i]->server_type; servertype = tbrm_servers[i]->server_type;
iparam[1].buffer = (void *) gtid;
uparam[0].buffer = (void *) gtid;
uparam[0].buffer_length = tbrm_servers[i]->gtid_len; uparam[0].buffer_length = tbrm_servers[i]->gtid_len;
iparam[1].buffer_length = tbrm_servers[i]->gtid_len; iparam[1].buffer_length = tbrm_servers[i]->gtid_len;
@ -823,32 +893,36 @@ tbrm_write_server_metadata(
// Fetch result // Fetch result
myerrno = mysql_stmt_fetch(sstmt); myerrno = mysql_stmt_fetch(sstmt);
if (myerrno == 1) { if (myerrno != 0 && myerrno != MYSQL_NO_DATA) {
tbrm_stmt_error(sstmt, "Error: Could not fetch result set", __FILE__, __LINE__); tbrm_stmt_error(sstmt, "Error: Could not fetch result set", __FILE__, __LINE__);
goto error_exit; goto error_exit;
} }
// If fetch returned 0, it means that this table, serverid // If fetch returned 0 rows, it means that this table, serverid
// pair was found from metadata, we might need to update // pair was found from metadata, we might need to update
// the consistency information. // the consistency information.
if (myerrno == 0 && binlogpos != tbrm_servers[i]->binlog_pos) { if (myerrno == 0) {
// Update the consistency information // We update the consistency if and only if the
binlogpos = tbrm_servers[i]->binlog_pos; // binlog position for this table has changed
if (binlogpos != tbrm_servers[i]->binlog_pos) {
// Update the consistency information
binlogpos = tbrm_servers[i]->binlog_pos;
// Bind param structure to statement // Bind param structure to statement
if (mysql_stmt_bind_param(ustmt, uparam) != 0) { if (mysql_stmt_bind_param(ustmt, uparam) != 0) {
tbrm_stmt_error(ustmt, "Error: Could not bind update parameters", __FILE__, __LINE__); tbrm_stmt_error(ustmt, "Error: Could not bind update parameters", __FILE__, __LINE__);
goto error_exit; goto error_exit;
} }
// Execute!! // Execute!!
if (mysql_stmt_execute(ustmt) != 0) { if (mysql_stmt_execute(ustmt) != 0) {
tbrm_stmt_error(ustmt, "Error: Could not execute update statement", __FILE__, __LINE__); tbrm_stmt_error(ustmt, "Error: Could not execute update statement", __FILE__, __LINE__);
goto error_exit; goto error_exit;
} }
if (tbr_debug) { if (tbr_debug) {
skygw_log_write_flush(NULL, LOGFILE_TRACE, skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Debug: Metadata state updated for %s in server %d is binlog_pos %lu gtid '%s'", (char *)"TRC Debug: Metadata state updated for %s in server %d is binlog_pos %lu gtid '%s'",
dbtable, serverid, binlogpos, gtid); dbtable, serverid, binlogpos, gtid);
}
} }
} else { } else {

View File

@ -115,7 +115,7 @@ tbrm_write_server_metadata(
unsigned int master_port, /*!< in: master port */ unsigned int master_port, /*!< in: master port */
tbr_server_t **tbrm_server, /*!< out: table replication server tbr_server_t **tbrm_server, /*!< out: table replication server
metadata. */ metadata. */
size_t *tbrm_rows); /*!< out: number of rows read */ size_t tbrm_rows); /*!< out: number of rows read */
} // table_replication_metadata } // table_replication_metadata

View File

@ -444,7 +444,63 @@ tbr_parser_table_names(
} }
} }
// TODO: Is create table/drop table needed ? // Create/Drop table
if (tbr_match_keyword(&m, "CREATE") &&
tbr_skipto_keyword(&m, "DROP", "")) {
// Eat TEMPORARY keyword
tbr_match_keyword(&m, "TEMPORARY");
// Eat IF NOT EXISTS
tbr_match_keyword(&m, "IF NOT EXISTS");
// Eat IF EXISTS
tbr_match_keyword(&m, "IF EXISTS");
// Eat TABLE keyword
tbr_match_keyword(&m, "TABLE");
dbname = (char *)malloc(len+1);
tbname = (char *)malloc(len+1);
if (tbr_get_tablename(&m, dbname, len, tbname, len)) {
db_name[name_count] = dbname;
table_name[name_count] = tbname;
name_count++;
if (tbr_debug) {
// Table names are delimited by ","
while(tbr_match_const(&m, ",")) {
dbname = (char *)malloc(len+1);
tbname = (char *)malloc(len+1);
// Parse the next db.table name
if (tbr_get_tablename(&m, dbname, len,tbname,len)) {
db_name[name_count] = dbname;
table_name[name_count] = tbname;
name_count++;
if (tbr_debug) {
skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Debug: DROP TABLE to %s.%s",
dbname, tbname);
}
} else {
free(dbname);
free(tbname);
return (false);
}
}
skygw_log_write_flush(NULL, LOGFILE_TRACE,
(char *)"TRC Debug: CREATE/DROP TABLE to %s.%s",
dbname, tbname);
}
} else {
free(dbname);
free(tbname);
return (false); // Parse error
}
}
*n_tables = name_count; *n_tables = name_count;

View File

@ -0,0 +1,98 @@
#include <getopt.h>
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <mysql.h>
#ifndef bool
#define bool int
#endif
#include "table_replication_consistency.h"
#include "../log_manager/log_manager.h"
static char* server_options[] = {
(char *)"jtest",
(char *)"--datadir=/tmp",
(char *)"--skip-innodb",
(char *)"--default-storage-engine=myisam",
NULL
};
const int num_elements = (sizeof(server_options) / sizeof(char *)) - 1;
static char* server_groups[] = { (char *)"libmysqld_server",
(char *)"libmysqld_client",
(char *)"libmysqld_server",
(char *)"libmysqld_server", NULL };
int main(int argc, char** argv)
{
int i=0,k=0;
char *uri;
replication_listener_t *mrl;
int err=0;
char *errstr=NULL;
// This will initialize MySQL
if (mysql_library_init(num_elements, server_options, server_groups)) {
printf("MySQL server init failed\n");
exit(2);
}
mrl = (replication_listener_t*)calloc(argc, sizeof(replication_listener_t));
if (argc < 2) {
printf("Usage: Example <uri> [<uri> ...]\n");
exit(2);
}
for(i=0; i < argc; i++) {
uri= argv[i];
if ( strncmp("mysql://", uri, 8) == 0) {
mrl[k].server_url = (char *)malloc(strlen(uri)+1);
strcpy(mrl[k].server_url, uri);
if (k == 0) {
mrl[k].is_master = 1;
}
k++;
}
}
const char *opts[] = {
(char *)"test",
(char *)"-g",
(char *)"/home/jan/",
NULL
};
skygw_logmanager_init(NULL, 3, (char **)&opts);
err = tb_replication_consistency_init(mrl, k, 5, TBR_TRACE_DEBUG);
if (err ) {
perror(NULL);
exit(1);
}
// This will allow the server to start
for(;;) {
sleep(10);
}
err = tb_replication_consistency_shutdown(&errstr);
if (*errstr) {
fprintf(stderr, "%s\n", errstr);
free(errstr);
}
exit(0);
}