diff --git a/table_replication_consistency/table_replication_consistency.cpp b/table_replication_consistency/table_replication_consistency.cpp index 32f72b6e4..2a6dc3c45 100644 --- a/table_replication_consistency/table_replication_consistency.cpp +++ b/table_replication_consistency/table_replication_consistency.cpp @@ -154,7 +154,6 @@ tb_replication_consistency_init( } // Start actual replication listener - /* err = pthread_create( &replication_listener_tid[i], NULL, @@ -165,7 +164,6 @@ tb_replication_consistency_init( errmsg = string(strerror(err)); goto error_handling; } - */ } // Replication listener will use this exception for @@ -345,7 +343,7 @@ err_exit: /***********************************************************************//** This function is to shutdown the replication listener and free all -resources on table consistency. This function (TODO) will store +resources on table consistency. This function will store the current status on metadata to MySQL server. @return 0 on success, error code at failure. */ int diff --git a/table_replication_consistency/table_replication_consistency.h b/table_replication_consistency/table_replication_consistency.h index 2ec92b9aa..92a423818 100644 --- a/table_replication_consistency/table_replication_consistency.h +++ b/table_replication_consistency/table_replication_consistency.h @@ -153,7 +153,7 @@ tb_replication_consistency_reconnect( /***********************************************************************//** This function is to shutdown the replication listener and free all -resources on table consistency. This function (TODO) will store +resources on table consistency. This function will store the current status on metadata to MySQL server. @return 0 on success, error code at failure. */ int diff --git a/table_replication_consistency/table_replication_listener.cpp b/table_replication_consistency/table_replication_listener.cpp index 5c7f5d8e2..cb4baf5d2 100644 --- a/table_replication_consistency/table_replication_listener.cpp +++ b/table_replication_consistency/table_replication_listener.cpp @@ -156,17 +156,26 @@ tbrl_update_consistency( // Need to be protected by mutex to avoid concurrency problems boost::mutex::scoped_lock lock(table_consistency_mutex); - if(table_consistency_map.find(database_dot_table) == table_consistency_map.end()) { + multimap::iterator key = table_consistency_map.find(database_dot_table); + + if( key == table_consistency_map.end()) { not_found = true; } else { // Loop through the consistency values - for(multimap::iterator i = table_consistency_map.find(database_dot_table); + for(multimap::iterator i = key; i != table_consistency_map.end(); ++i) { tc = (*i).second; if (tc->server_id == lheader->server_id) { not_found = false; break; } + + // If the next table name is not anymore the same, + // we can safely exit from the loop, names are ordered + if (strcpy((char *)tc->db_table, (char *)database_dot_table.c_str()) != 0) { + not_found = true; + break; + } } } @@ -219,9 +228,13 @@ tbrl_update_server_status( // Need to be protected by mutex to avoid concurrency problems boost::mutex::scoped_lock lock(table_servers_mutex); - if(table_replication_servers.find(lheader->server_id) == table_replication_servers.end()) { + // Try to find out the servre metadata + map::iterator key = table_replication_servers.find(lheader->server_id); + + if( key == table_replication_servers.end()) { not_found = true; } else { + ts = (*key).second; not_found = false; } @@ -446,6 +459,20 @@ void* tb_replication_listener_reader( database_dot_table= tb_it->second; } + if (tbr_debug) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Debug: Thread %ld Server %d Binlog_pos %lu event %d" + " : %s dbtable '%s' id %d", + id, + lheader->server_id, + lheader->next_position, + revent->get_event_type(), + get_event_type_str(revent->get_event_type()), + database_dot_table.c_str(), + revent->table_id); + } + + // Update the consistency information tbrl_update_consistency(lheader, database_dot_table, gtid_known, gtid); @@ -503,7 +530,7 @@ void* tb_replication_listener_reader( /***********************************************************************//** This function is to shutdown the replication listener and free all -resources on table consistency. This function (TODO) will store +resources on table consistency. This function will store the current status on metadata to MySQL server. @return 0 on success, error code at failure. */ int @@ -752,6 +779,7 @@ void { master = (replication_listener_t*)arg; tbr_metadata_t **tm=NULL; + tbr_server_t **ts=NULL; bool err = false; // Set up master connect info @@ -761,35 +789,83 @@ void sleep(10); // Sleep ~10 seconds try { - // Need to be protected by mutex to avoid concurrency problems - boost::mutex::scoped_lock lock(table_consistency_mutex); + size_t nelems; - size_t nelems = table_consistency_map.size(); - size_t k =0; + // This scope for scoped mutexing + { + // Need to be protected by mutex to avoid concurrency problems + boost::mutex::scoped_lock lock(table_consistency_mutex); - tm = (tbr_metadata_t**)calloc(nelems, sizeof(tbr_metadata_t*)); + nelems = table_consistency_map.size(); + size_t k =0; - for(multimap::iterator i = table_consistency_map.begin(); - i != table_consistency_map.end(); ++i,++k) { + tm = (tbr_metadata_t**)calloc(nelems, sizeof(tbr_metadata_t*)); - tm[k] = ((*i).second); + if (!tm) { + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)"Error: TRM: Out of memory"); + goto my_exit; + } + + for(multimap::iterator i = table_consistency_map.begin(); + i != table_consistency_map.end(); ++i,++k) { + + tm[k] = ((*i).second); + } } - // Release mutex - lock.unlock(); // Insert or update metadata information - err = tbrm_write_consistency_metadata( + if (!tbrm_write_consistency_metadata( (const char *)master_host, (const char *)master_user, (const char *)master_passwd, master_port, tm, - nelems); + nelems)) { + goto my_exit; + } free(tm); + tm = NULL; + // This scope for scoped mutexing + { + // Need to be protected by mutex to avoid + // concurrency problems + boost::mutex::scoped_lock lock(table_servers_mutex); + + nelems = table_replication_servers.size(); + size_t k =0; + + ts = (tbr_server_t**)calloc(nelems, sizeof(tbr_server_t*)); + + if (!ts) { + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)"Error: TRM: Out of memory"); + goto my_exit; + } + + for(map::iterator i = table_replication_servers.begin(); + i != table_replication_servers.end(); ++i,++k) { + + ts[k] = ((*i).second); + + } + } + + // Insert or update metadata information + if (!tbrm_write_server_metadata( + (const char *)master_host, + (const char *)master_user, + (const char *)master_passwd, + master_port, + ts, + nelems)) { + goto my_exit; + } + + free(ts); + ts = NULL; } catch(ListenerException e) { @@ -820,6 +896,15 @@ void } my_exit: + + if (tm) { + free(tm); + } + + if (ts) { + free(ts); + } + if (tbr_trace) { skygw_log_write_flush(NULL, LOGFILE_TRACE, (char *)"Shutting down the metadata updater thread"); } @@ -840,6 +925,7 @@ tb_replication_listener_init( char **error_message) /*!< out: error message */ { tbr_metadata_t *tm = NULL; + tbr_server_t *ts=NULL; size_t tm_rows = 0; std::string dbtable; std::string err; @@ -856,7 +942,7 @@ tb_replication_listener_init( (unsigned int)master_port, &tm, &tm_rows)) { - err = std::string("Error: reading metadata failed"); + err = std::string("Error: reading table consistency metadata failed"); goto error_exit; } @@ -866,6 +952,24 @@ tb_replication_listener_init( table_consistency_map.insert(pair(dbtable, t)); } + + if (!tbrm_read_server_metadata( + (const char *)master_host, + (const char *)master_user, + (const char *)master_passwd, + (unsigned int)master_port, + &ts, + &tm_rows)) { + err = std::string("Error: reading table servers metadata failed"); + goto error_exit; + } + + for(size_t i=0;i < tm_rows; i++) { + tbr_server_t *t = &(ts[i]); + + table_replication_servers.insert(pair(t->server_id, t)); + } + } catch(ListenerException e) { @@ -912,32 +1016,41 @@ tb_replication_listener_done( char **error_message) /*!< out: error message */ { size_t nelems = table_consistency_map.size(); + size_t nelems2 = table_replication_servers.size(); size_t k =0; tbr_metadata_t **tm=NULL; - int err = 0; + tbr_server_t **ts=NULL; + bool err = false; tm = (tbr_metadata_t**)calloc(nelems, sizeof(tbr_metadata_t*)); + ts = (tbr_server_t **)calloc(nelems2, sizeof(tbr_server_t*)); + + if (tm == NULL || ts == NULL) { + skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)"TRM: Out of memory"); + goto error_exit; + } try { + k = 0; for(multimap::iterator i = table_consistency_map.begin(); i != table_consistency_map.end(); ++i,++k) { tm[k] = ((*i).second); } - // Insert or update metadata information - err = tbrm_write_consistency_metadata( + // Insert or update table consistency metadata information + if (!tbrm_write_consistency_metadata( (const char *)master_host, (const char *)master_user, (const char *)master_passwd, (unsigned int)master_port, tm, - (size_t)nelems); - - free(tm); + nelems)) { + goto error_exit; + } // Clean up memory allocation for multimap items for(multimap::iterator i = table_consistency_map.begin(); - i != table_consistency_map.end(); ++i,++k) { + i != table_consistency_map.end(); ++i) { tbr_metadata_t *trm = ((*i).second); free(trm->db_table); @@ -947,6 +1060,34 @@ tb_replication_listener_done( free(trm); } + k=0; + for(map::iterator i = table_replication_servers.begin(); + i != table_replication_servers.end(); ++i,++k) { + ts[k] = ((*i).second); + } + + // Insert or update table server metadata information + if (!tbrm_write_server_metadata( + (const char *)master_host, + (const char *)master_user, + (const char *)master_passwd, + (unsigned int)master_port, + ts, + nelems2)) { + goto error_exit; + } + + // Clean up memory allocation for multimap items + for(map::iterator j = table_replication_servers.begin(); + j != table_replication_servers.end(); ++j) { + tbr_server_t *trs = ((*j).second); + + free(trs->gtid); + + table_replication_servers.erase(j); + free(trs); + } + // Clean up binlog listeners table_replication_listeners.erase(table_replication_listeners.begin(), table_replication_listeners.end()); } @@ -954,27 +1095,45 @@ tb_replication_listener_done( { string err = std::string("Listener exception: ")+ e.what(); skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + goto error_exit; } catch(boost::system::error_code e) { string err = std::string("Listener system exception: ")+ e.message(); skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + goto error_exit; } // Try and catch all exceptions catch(std::exception const& e) { string err = std::string("Listener other exception: ")+ e.what(); skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + goto error_exit; } // Rest of them catch(...) { string err = std::string("Unknown exception: "); skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)err.c_str()); + goto error_exit; } if (tbr_trace) { skygw_log_write_flush(NULL, LOGFILE_TRACE, (char *)"Shutting down the listeners"); + goto error_exit; + } + + free(tm); + free(ts); + + return err; + +error_exit: + if (tm) { + free(tm); + } + if (ts) { + free(ts); } return err; diff --git a/table_replication_consistency/table_replication_metadata.cpp b/table_replication_consistency/table_replication_metadata.cpp index f024fbaf6..600f61f42 100644 --- a/table_replication_consistency/table_replication_metadata.cpp +++ b/table_replication_consistency/table_replication_metadata.cpp @@ -44,14 +44,15 @@ namespace mysql { namespace table_replication_metadata { /***********************************************************************//** +Internal function to write error messages to the log file. */ static void tbrm_report_error( /*==============*/ - MYSQL *con, - const char *message, - const char *file, - int line) + MYSQL *con, /*!< in: MySQL connection */ + const char *message, /*!< in: Error message */ + const char *file, /*!< in: File name */ + int line) /*!< in: Line number */ { skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)"%s at file %s line %d", message, file, line); @@ -63,14 +64,15 @@ tbrm_report_error( } /***********************************************************************//** +Internal function to write statement error messages to the log file. */ static void tbrm_stmt_error( /*============*/ - MYSQL_STMT *stmt, - const char *message, - const char *file, - int line) + MYSQL_STMT *stmt, /*!< in: MySQL statement */ + const char *message, /*!< in: Error message */ + const char *file, /*!< in: File name */ + int line) /*!< in: Line number */ { skygw_log_write_flush(NULL, LOGFILE_ERROR, (char *)"%s at file %s line %d", message, file, line); @@ -93,10 +95,10 @@ create succeeded */ static bool tbrm_create_metadata( /*=================*/ - const char *master_host, - const char *user, - const char *passwd, - unsigned int master_port) + const char *master_host, /*!< in: Master host name */ + const char *user, /*!< in: Username */ + const char *passwd, /*!< in: Passwd */ + unsigned int master_port) /*!< in: Master port */ { MYSQL *con = mysql_init(NULL); unsigned int myerrno=0; @@ -215,19 +217,21 @@ tbrm_read_consistency_metadata( unsigned int master_port, /*!< in: master port */ tbr_metadata_t **tbrm_meta, /*!< out: table replication consistency metadata. */ - size_t *tbrm_rows) /*!< out: number of rows read */ + size_t *tbrm_rows) /*!< out: number of rows read */ { unsigned int myerrno=0; boost::uint64_t nrows=0; boost::uint64_t i=0; MYSQL_RES *result = NULL; + tbr_metadata_t *tm=NULL; tbrm_create_metadata(master_host, user, passwd, master_port); MYSQL *con = mysql_init(NULL); if (!con) { - // TODO: start to log error and other messages + skygw_log_write_flush(NULL, LOGFILE_ERROR, + (char *)"Error: MySQL init failed"); return false; } @@ -264,25 +268,49 @@ tbrm_read_consistency_metadata( nrows = mysql_num_rows(result); - *tbrm_meta = (tbr_metadata_t*) calloc(nrows, sizeof(tbr_metadata_t)); + tm = (tbr_metadata_t*) malloc(nrows * sizeof(tbr_metadata_t)); + + if (!tm) { + skygw_log_write_flush(NULL, LOGFILE_ERROR, + (char *)"Error: Out of memory"); + goto error_exit; + } + + memset(tm, 0, nrows * sizeof(tbr_metadata_t)); *tbrm_rows = nrows; + *tbrm_meta = tm; for(i=0;i < nrows; i++) { MYSQL_ROW row = mysql_fetch_row(result); unsigned long *lengths = mysql_fetch_lengths(result); // DB_TABLE_NAME - tbrm_meta[i]->db_table = (unsigned char *)malloc(lengths[0]); - strcpy((char *)tbrm_meta[i]->db_table, row[0]); + tm[i].db_table = (unsigned char *)malloc(lengths[0]); + + if (!tm[i].db_table) { + skygw_log_write_flush(NULL, LOGFILE_ERROR, + (char *)"Error: Out of memory"); + goto error_exit; + } + + strcpy((char *)tm[i].db_table, row[0]); // SERVER_ID - tbrm_meta[i]->server_id = atol(row[1]); + tm[i].server_id = atol(row[1]); // GTID - tbrm_meta[i]->gtid = (unsigned char *)malloc((lengths[2])*sizeof(unsigned char)); - memcpy(tbrm_meta[i]->gtid, row[2], lengths[2]); - tbrm_meta[i]->gtid_len = lengths[2]; + tm[i].gtid = (unsigned char *)malloc((lengths[2])*sizeof(unsigned char)); + + if (!tm[i].gtid) { + free(tm[i].db_table); + skygw_log_write_flush(NULL, LOGFILE_ERROR, + (char *)"Error: Out of memory"); + goto error_exit; + } + + memcpy(tm[i].gtid, row[2], lengths[2]); + tm[i].gtid_len = lengths[2]; // BINLOG_POS - tbrm_meta[i]->binlog_pos = atoll(row[3]); + tm[i].binlog_pos = atoll(row[3]); // GTID_KNOWN - tbrm_meta[i]->gtid_known = atol(row[4]); + tm[i].gtid_known = atol(row[4]); } mysql_free_result(result); @@ -292,6 +320,16 @@ tbrm_read_consistency_metadata( error_exit: + if (tm) { + for(size_t k=0;i < i; k++) { + free(tm[k].db_table); + free(tm[k].gtid); + } + free(tm); + *tbrm_rows = 0; + *tbrm_meta = NULL; + } + if (result) { mysql_free_result(result); } @@ -316,7 +354,7 @@ tbrm_write_consistency_metadata( unsigned int master_port, /*!< in: master port */ tbr_metadata_t **tbrm_meta, /*!< in: table replication consistency metadata. */ - size_t tbrm_rows) /*!< in: number of rows read */ + size_t tbrm_rows) /*!< in: number of rows read */ { int myerrno=0; boost::uint32_t i; @@ -327,15 +365,15 @@ tbrm_write_consistency_metadata( MYSQL_BIND iparam[5]; MYSQL_BIND uparam[5]; MYSQL_BIND result[1]; - char *dbtable; - void *gtid; + char *dbtable=NULL; + void *gtid=NULL; int gtidknown; int serverid; boost::uint64_t binlogpos; // Query to find out if the row already exists on table const char *sst = "SELECT BINLOG_POS FROM TABLE_REPLICATION_CONSISTENCY WHERE" - " DB_TABLE_NAME='?' and SERVER_ID=?"; + " DB_TABLE_NAME=? and SERVER_ID=?"; // Insert Query const char *ist = "INSERT INTO TABLE_REPLICATION_CONSISTENCY(DB_TABLE_NAME," @@ -350,7 +388,8 @@ tbrm_write_consistency_metadata( MYSQL *con = mysql_init(NULL); if (!con) { - // TODO: start to log error and other messages + skygw_log_write_flush(NULL, LOGFILE_ERROR, + (char *)"Mysql init failed", mysql_error(con)); return false; } @@ -402,29 +441,24 @@ tbrm_write_consistency_metadata( // Init param structure // Select sparam[0].buffer_type = MYSQL_TYPE_VARCHAR; - sparam[0].buffer = (void *) dbtable; sparam[1].buffer_type = MYSQL_TYPE_LONG; sparam[1].buffer = (void *) &serverid; // Insert iparam[0].buffer_type = MYSQL_TYPE_VARCHAR; - iparam[0].buffer = (void *) dbtable; iparam[1].buffer_type = MYSQL_TYPE_LONG; iparam[1].buffer = (void *) &serverid; iparam[2].buffer_type = MYSQL_TYPE_BLOB; - iparam[2].buffer = (void *) gtid; iparam[3].buffer_type = MYSQL_TYPE_LONGLONG; iparam[3].buffer = (void *) &binlogpos; iparam[4].buffer_type = MYSQL_TYPE_SHORT; iparam[4].buffer = (void *) >idknown; // Update uparam[0].buffer_type = MYSQL_TYPE_BLOB; - uparam[0].buffer = (void *) gtid; uparam[1].buffer_type = MYSQL_TYPE_LONGLONG; uparam[1].buffer = (void *) &binlogpos; uparam[2].buffer_type = MYSQL_TYPE_SHORT; uparam[2].buffer = (void *) >idknown; uparam[3].buffer_type = MYSQL_TYPE_VARCHAR; - uparam[3].buffer = (void *) dbtable; uparam[4].buffer_type = MYSQL_TYPE_LONG; uparam[4].buffer = (void *) &serverid; // Result set for select @@ -442,7 +476,12 @@ tbrm_write_consistency_metadata( gtid = (char *)tbrm_meta[i]->gtid; gtidknown = tbrm_meta[i]->gtid_known; serverid = tbrm_meta[i]->server_id; + uparam[3].buffer = (void *) dbtable; + sparam[0].buffer = (void *) dbtable; + uparam[0].buffer = (void *) gtid; + iparam[0].buffer = (void *) dbtable; + iparam[2].buffer = (void *) gtid; sparam[0].buffer_length = strlen(dbtable); uparam[3].buffer_length = sparam[0].buffer_length; iparam[0].buffer_length = sparam[0].buffer_length; @@ -475,32 +514,36 @@ tbrm_write_consistency_metadata( // Fetch result myerrno = mysql_stmt_fetch(sstmt); - if (myerrno == 1) { + if (myerrno != 0 && myerrno != MYSQL_NO_DATA) { tbrm_stmt_error(sstmt, "Error: Could not fetch result set", __FILE__, __LINE__); goto error_exit; } - // If fetch returned 0, it means that this table, serverid + // If fetch returned 0 rows, it means that this table, serverid // pair was found from metadata, we might need to update // the consistency information. - if (myerrno == 0 && binlogpos != tbrm_meta[i]->binlog_pos) { - // Update the consistency information - binlogpos = tbrm_meta[i]->binlog_pos; + if (myerrno == 0) { + // We update the consistency if and only if the + // binlog position for this table has changed + if (binlogpos != tbrm_meta[i]->binlog_pos) { + // Update the consistency information + binlogpos = tbrm_meta[i]->binlog_pos; - // Bind param structure to statement - if (mysql_stmt_bind_param(ustmt, uparam) != 0) { - tbrm_stmt_error(ustmt, "Error: Could not bind update parameters", __FILE__, __LINE__); - goto error_exit; - } - // Execute!! - if (mysql_stmt_execute(ustmt) != 0) { - tbrm_stmt_error(ustmt, "Error: Could not execute update statement", __FILE__, __LINE__); - goto error_exit; - } - if (tbr_debug) { - skygw_log_write_flush(NULL, LOGFILE_TRACE, - (char *)"TRC Debug: Metadata state updated for %s in server %d is binlog_pos %lu gtid '%s'", - dbtable, serverid, binlogpos, gtid); + // Bind param structure to statement + if (mysql_stmt_bind_param(ustmt, uparam) != 0) { + tbrm_stmt_error(ustmt, "Error: Could not bind update parameters", __FILE__, __LINE__); + goto error_exit; + } + // Execute!! + if (mysql_stmt_execute(ustmt) != 0) { + tbrm_stmt_error(ustmt, "Error: Could not execute update statement", __FILE__, __LINE__); + goto error_exit; + } + if (tbr_debug) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Debug: Metadata state updated for %s in server %d is binlog_pos %lu gtid '%s'", + dbtable, serverid, binlogpos, gtid); + } } } else { @@ -576,13 +619,16 @@ tbrm_read_server_metadata( boost::uint64_t nrows=0; boost::uint64_t i=0; MYSQL_RES *result = NULL; + tbr_server_t *ts=NULL; tbrm_create_metadata(master_host, user, passwd, master_port); MYSQL *con = mysql_init(NULL); if (!con) { - // TODO: start to log error and other messages + skygw_log_write_flush(NULL, LOGFILE_ERROR, + (char *)"Mysql init failed", mysql_error(con)); + return false; } @@ -619,24 +665,39 @@ tbrm_read_server_metadata( nrows = mysql_num_rows(result); - *tbrm_servers = (tbr_server_t*) calloc(nrows, sizeof(tbr_server_t)); + ts = (tbr_server_t*) malloc(nrows * sizeof(tbr_server_t)); + + if(!ts) { + skygw_log_write_flush(NULL, LOGFILE_ERROR, + (char *)"Error: Out of memory"); + goto error_exit; + } + *tbrm_rows = nrows; + *tbrm_servers = ts; for(i=0;i < nrows; i++) { MYSQL_ROW row = mysql_fetch_row(result); unsigned long *lengths = mysql_fetch_lengths(result); // SERVER_ID - tbrm_servers[i]->server_id = atol(row[0]); + ts[i].server_id = atol(row[0]); // BINLOG_POS - tbrm_servers[i]->binlog_pos = atoll(row[1]); + ts[i].binlog_pos = atoll(row[1]); // GTID - tbrm_servers[i]->gtid = (unsigned char *)malloc((lengths[2])*sizeof(unsigned char)); - memcpy(tbrm_servers[i]->gtid, row[2], lengths[2]); - tbrm_servers[i]->gtid_len = lengths[2]; + ts[i].gtid = (unsigned char *)malloc((lengths[2])*sizeof(unsigned char)); + + if (!ts[i].gtid) { + skygw_log_write_flush(NULL, LOGFILE_ERROR, + (char *)"Error: Out of memory"); + goto error_exit; + } + + memcpy(ts[i].gtid, row[2], lengths[2]); + ts[i].gtid_len = lengths[2]; // GTID_KNOWN - tbrm_servers[i]->gtid_known = atol(row[3]); + ts[i].gtid_known = atol(row[3]); // SERVER_TYPE - tbrm_servers[i]->server_type = atol(row[4]); + ts[i].server_type = atol(row[4]); } mysql_free_result(result); @@ -645,6 +706,14 @@ tbrm_read_server_metadata( return true; error_exit: + if (ts) { + for(size_t k=0;i < i; k++) { + free(ts[k].gtid); + } + free(ts); + *tbrm_rows = 0; + *tbrm_servers = NULL; + } if (result) { mysql_free_result(result); @@ -705,7 +774,8 @@ tbrm_write_server_metadata( MYSQL *con = mysql_init(NULL); if (!con) { - // TODO: start to log error and other messages + skygw_log_write_flush(NULL, LOGFILE_ERROR, + (char *)"Mysql init failed", mysql_error(con)); return false; } @@ -762,7 +832,6 @@ tbrm_write_server_metadata( iparam[0].buffer_type = MYSQL_TYPE_LONG; iparam[0].buffer = (void *) &serverid; iparam[1].buffer_type = MYSQL_TYPE_BLOB; - iparam[1].buffer = (void *) gtid; iparam[2].buffer_type = MYSQL_TYPE_LONGLONG; iparam[2].buffer = (void *) &binlogpos; iparam[3].buffer_type = MYSQL_TYPE_SHORT; @@ -771,7 +840,6 @@ tbrm_write_server_metadata( iparam[4].buffer = (void *) &servertype; // Update uparam[0].buffer_type = MYSQL_TYPE_BLOB; - uparam[0].buffer = (void *) gtid; uparam[1].buffer_type = MYSQL_TYPE_LONGLONG; uparam[1].buffer = (void *) &binlogpos; uparam[2].buffer_type = MYSQL_TYPE_SHORT; @@ -794,6 +862,8 @@ tbrm_write_server_metadata( serverid = tbrm_servers[i]->server_id; servertype = tbrm_servers[i]->server_type; + iparam[1].buffer = (void *) gtid; + uparam[0].buffer = (void *) gtid; uparam[0].buffer_length = tbrm_servers[i]->gtid_len; iparam[1].buffer_length = tbrm_servers[i]->gtid_len; @@ -823,32 +893,36 @@ tbrm_write_server_metadata( // Fetch result myerrno = mysql_stmt_fetch(sstmt); - if (myerrno == 1) { + if (myerrno != 0 && myerrno != MYSQL_NO_DATA) { tbrm_stmt_error(sstmt, "Error: Could not fetch result set", __FILE__, __LINE__); goto error_exit; } - // If fetch returned 0, it means that this table, serverid + // If fetch returned 0 rows, it means that this table, serverid // pair was found from metadata, we might need to update // the consistency information. - if (myerrno == 0 && binlogpos != tbrm_servers[i]->binlog_pos) { - // Update the consistency information - binlogpos = tbrm_servers[i]->binlog_pos; + if (myerrno == 0) { + // We update the consistency if and only if the + // binlog position for this table has changed + if (binlogpos != tbrm_servers[i]->binlog_pos) { + // Update the consistency information + binlogpos = tbrm_servers[i]->binlog_pos; - // Bind param structure to statement - if (mysql_stmt_bind_param(ustmt, uparam) != 0) { - tbrm_stmt_error(ustmt, "Error: Could not bind update parameters", __FILE__, __LINE__); - goto error_exit; - } - // Execute!! - if (mysql_stmt_execute(ustmt) != 0) { - tbrm_stmt_error(ustmt, "Error: Could not execute update statement", __FILE__, __LINE__); - goto error_exit; - } - if (tbr_debug) { - skygw_log_write_flush(NULL, LOGFILE_TRACE, - (char *)"TRC Debug: Metadata state updated for %s in server %d is binlog_pos %lu gtid '%s'", - dbtable, serverid, binlogpos, gtid); + // Bind param structure to statement + if (mysql_stmt_bind_param(ustmt, uparam) != 0) { + tbrm_stmt_error(ustmt, "Error: Could not bind update parameters", __FILE__, __LINE__); + goto error_exit; + } + // Execute!! + if (mysql_stmt_execute(ustmt) != 0) { + tbrm_stmt_error(ustmt, "Error: Could not execute update statement", __FILE__, __LINE__); + goto error_exit; + } + if (tbr_debug) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Debug: Metadata state updated for %s in server %d is binlog_pos %lu gtid '%s'", + dbtable, serverid, binlogpos, gtid); + } } } else { diff --git a/table_replication_consistency/table_replication_metadata.h b/table_replication_consistency/table_replication_metadata.h index 23ac8499e..b7da7047e 100644 --- a/table_replication_consistency/table_replication_metadata.h +++ b/table_replication_consistency/table_replication_metadata.h @@ -115,7 +115,7 @@ tbrm_write_server_metadata( unsigned int master_port, /*!< in: master port */ tbr_server_t **tbrm_server, /*!< out: table replication server metadata. */ - size_t *tbrm_rows); /*!< out: number of rows read */ + size_t tbrm_rows); /*!< out: number of rows read */ } // table_replication_metadata diff --git a/table_replication_consistency/table_replication_parser.cpp b/table_replication_consistency/table_replication_parser.cpp index 56684f09b..f1372a56f 100644 --- a/table_replication_consistency/table_replication_parser.cpp +++ b/table_replication_consistency/table_replication_parser.cpp @@ -444,7 +444,63 @@ tbr_parser_table_names( } } - // TODO: Is create table/drop table needed ? + // Create/Drop table + if (tbr_match_keyword(&m, "CREATE") && + tbr_skipto_keyword(&m, "DROP", "")) { + + // Eat TEMPORARY keyword + tbr_match_keyword(&m, "TEMPORARY"); + + // Eat IF NOT EXISTS + tbr_match_keyword(&m, "IF NOT EXISTS"); + + // Eat IF EXISTS + tbr_match_keyword(&m, "IF EXISTS"); + + // Eat TABLE keyword + tbr_match_keyword(&m, "TABLE"); + + dbname = (char *)malloc(len+1); + tbname = (char *)malloc(len+1); + + if (tbr_get_tablename(&m, dbname, len, tbname, len)) { + db_name[name_count] = dbname; + table_name[name_count] = tbname; + name_count++; + + if (tbr_debug) { + // Table names are delimited by "," + while(tbr_match_const(&m, ",")) { + dbname = (char *)malloc(len+1); + tbname = (char *)malloc(len+1); + // Parse the next db.table name + if (tbr_get_tablename(&m, dbname, len,tbname,len)) { + db_name[name_count] = dbname; + table_name[name_count] = tbname; + name_count++; + + if (tbr_debug) { + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Debug: DROP TABLE to %s.%s", + dbname, tbname); + } + } else { + free(dbname); + free(tbname); + return (false); + } + } + skygw_log_write_flush(NULL, LOGFILE_TRACE, + (char *)"TRC Debug: CREATE/DROP TABLE to %s.%s", + dbname, tbname); + } + } else { + free(dbname); + free(tbname); + return (false); // Parse error + } + } + *n_tables = name_count; diff --git a/table_replication_consistency/test/Example b/table_replication_consistency/test/Example index 3dfeb89b7..356658560 100755 Binary files a/table_replication_consistency/test/Example and b/table_replication_consistency/test/Example differ diff --git a/table_replication_consistency/test/test.cpp b/table_replication_consistency/test/test.cpp new file mode 100644 index 000000000..e2f8714ea --- /dev/null +++ b/table_replication_consistency/test/test.cpp @@ -0,0 +1,98 @@ +#include +#include +#include +#include +#include +#include +#ifndef bool +#define bool int +#endif +#include "table_replication_consistency.h" +#include "../log_manager/log_manager.h" + +static char* server_options[] = { + (char *)"jtest", + (char *)"--datadir=/tmp", + (char *)"--skip-innodb", + (char *)"--default-storage-engine=myisam", + NULL +}; + +const int num_elements = (sizeof(server_options) / sizeof(char *)) - 1; + +static char* server_groups[] = { (char *)"libmysqld_server", + (char *)"libmysqld_client", + (char *)"libmysqld_server", + (char *)"libmysqld_server", NULL }; + + +int main(int argc, char** argv) +{ + + int i=0,k=0; + char *uri; + replication_listener_t *mrl; + int err=0; + char *errstr=NULL; + + // This will initialize MySQL + if (mysql_library_init(num_elements, server_options, server_groups)) { + printf("MySQL server init failed\n"); + exit(2); + } + + + mrl = (replication_listener_t*)calloc(argc, sizeof(replication_listener_t)); + + if (argc < 2) { + printf("Usage: Example [ ...]\n"); + exit(2); + } + + for(i=0; i < argc; i++) { + uri= argv[i]; + + if ( strncmp("mysql://", uri, 8) == 0) { + + mrl[k].server_url = (char *)malloc(strlen(uri)+1); + strcpy(mrl[k].server_url, uri); + + if (k == 0) { + mrl[k].is_master = 1; + } + k++; + + } + } + + const char *opts[] = { + (char *)"test", + (char *)"-g", + (char *)"/home/jan/", + NULL + }; + + skygw_logmanager_init(NULL, 3, (char **)&opts); + + err = tb_replication_consistency_init(mrl, k, 5, TBR_TRACE_DEBUG); + + if (err ) { + perror(NULL); + exit(1); + } + + // This will allow the server to start + for(;;) { + sleep(10); + } + + err = tb_replication_consistency_shutdown(&errstr); + + if (*errstr) { + fprintf(stderr, "%s\n", errstr); + free(errstr); + } + + exit(0); + +}