MXS-1881: Expand GTID helper class
Added string conversion methods to the gtid_pos_t class that can be used to store and load a GTID value. Also added the missing rpl.cc file that previously only had the Rpl class constructor in it.
This commit is contained in:
@ -4,7 +4,7 @@ if(AVRO_FOUND AND JANSSON_FOUND)
|
|||||||
|
|
||||||
# The common avrorouter functionality
|
# The common avrorouter functionality
|
||||||
add_library(avro-common SHARED avro.cc ../binlogrouter/binlog_common.cc avro_client.cc
|
add_library(avro-common SHARED avro.cc ../binlogrouter/binlog_common.cc avro_client.cc
|
||||||
avro_schema.cc avro_rbr.cc avro_file.cc avro_converter.cc)
|
avro_schema.cc avro_rbr.cc avro_file.cc avro_converter.cc rpl.cc)
|
||||||
set_target_properties(avro-common PROPERTIES VERSION "1.0.0" LINK_FLAGS -Wl,-z,defs)
|
set_target_properties(avro-common PROPERTIES VERSION "1.0.0" LINK_FLAGS -Wl,-z,defs)
|
||||||
target_link_libraries(avro-common maxscale-common ${JANSSON_LIBRARIES} ${AVRO_LIBRARIES} maxavro lzma)
|
target_link_libraries(avro-common maxscale-common ${JANSSON_LIBRARIES} ${AVRO_LIBRARIES} maxavro lzma)
|
||||||
install_module(avro-common core)
|
install_module(avro-common core)
|
||||||
|
|||||||
@ -149,24 +149,12 @@ static int conv_state_handler(void* data, const char* section, const char* key,
|
|||||||
if (strcmp(section, statefile_section) == 0)
|
if (strcmp(section, statefile_section) == 0)
|
||||||
{
|
{
|
||||||
if (strcmp(key, "gtid") == 0)
|
if (strcmp(key, "gtid") == 0)
|
||||||
{
|
|
||||||
char tempval[strlen(value) + 1];
|
|
||||||
memcpy(tempval, value, sizeof(tempval));
|
|
||||||
char *saved, *domain = strtok_r(tempval, ":-\n", &saved);
|
|
||||||
char *serv_id = strtok_r(NULL, ":-\n", &saved);
|
|
||||||
char *seq = strtok_r(NULL, ":-\n", &saved);
|
|
||||||
char *subseq = strtok_r(NULL, ":-\n", &saved);
|
|
||||||
|
|
||||||
if (domain && serv_id && seq && subseq)
|
|
||||||
{
|
{
|
||||||
gtid_pos_t gtid;
|
gtid_pos_t gtid;
|
||||||
gtid.domain = strtol(domain, NULL, 10);
|
ss_debug(bool rval = )gtid.parse(value);
|
||||||
gtid.server_id = strtol(serv_id, NULL, 10);
|
ss_dassert(rval);
|
||||||
gtid.seq = strtol(seq, NULL, 10);
|
|
||||||
gtid.event_num = strtol(subseq, NULL, 10);
|
|
||||||
router->handler.set_gtid(gtid);
|
router->handler.set_gtid(gtid);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
else if (strcmp(key, "position") == 0)
|
else if (strcmp(key, "position") == 0)
|
||||||
{
|
{
|
||||||
router->current_pos = strtol(value, NULL, 10);
|
router->current_pos = strtol(value, NULL, 10);
|
||||||
|
|||||||
88
server/modules/routing/avrorouter/rpl.cc
Normal file
88
server/modules/routing/avrorouter/rpl.cc
Normal file
@ -0,0 +1,88 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||||
|
*
|
||||||
|
* Change Date: 2020-01-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "rpl.hh"
|
||||||
|
|
||||||
|
#include <maxscale/debug.h>
|
||||||
|
#include <sstream>
|
||||||
|
|
||||||
|
Rpl::Rpl(SERVICE* service, SRowEventHandler handler, gtid_pos_t gtid):
|
||||||
|
m_handler(handler),
|
||||||
|
m_service(service),
|
||||||
|
m_binlog_checksum(0),
|
||||||
|
m_event_types(0),
|
||||||
|
m_gtid(gtid)
|
||||||
|
{
|
||||||
|
/** For detection of CREATE/ALTER TABLE statements */
|
||||||
|
static const char* create_table_regex = "(?i)create[a-z0-9[:space:]_]+table";
|
||||||
|
static const char* alter_table_regex = "(?i)alter[[:space:]]+table";
|
||||||
|
int pcreerr;
|
||||||
|
size_t erroff;
|
||||||
|
m_create_table_re = pcre2_compile((PCRE2_SPTR) create_table_regex, PCRE2_ZERO_TERMINATED,
|
||||||
|
0, &pcreerr, &erroff, NULL);
|
||||||
|
m_alter_table_re = pcre2_compile((PCRE2_SPTR) alter_table_regex, PCRE2_ZERO_TERMINATED,
|
||||||
|
0, &pcreerr, &erroff, NULL);
|
||||||
|
ss_info_dassert(m_create_table_re && m_alter_table_re,
|
||||||
|
"CREATE TABLE and ALTER TABLE regex compilation should not fail");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
void gtid_pos_t::extract(const REP_HEADER& hdr, uint8_t* ptr)
|
||||||
|
{
|
||||||
|
domain = extract_field(ptr + 8, 32);
|
||||||
|
server_id = hdr.serverid;
|
||||||
|
seq = extract_field(ptr, 64);
|
||||||
|
event_num = 0;
|
||||||
|
timestamp = hdr.timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool gtid_pos_t::parse(const char* str)
|
||||||
|
{
|
||||||
|
bool rval = false;
|
||||||
|
char buf[strlen(str) + 1];
|
||||||
|
strcpy(buf, str);
|
||||||
|
char *saved, *dom = strtok_r(buf, ":-\n", &saved);
|
||||||
|
char *serv_id = strtok_r(NULL, ":-\n", &saved);
|
||||||
|
char *sequence = strtok_r(NULL, ":-\n", &saved);
|
||||||
|
char *subseq = strtok_r(NULL, ":-\n", &saved);
|
||||||
|
|
||||||
|
if (dom && serv_id && sequence)
|
||||||
|
{
|
||||||
|
domain = strtol(dom, NULL, 10);
|
||||||
|
server_id = strtol(serv_id, NULL, 10);
|
||||||
|
seq = strtol(sequence, NULL, 10);
|
||||||
|
event_num = subseq ? strtol(subseq, NULL, 10) : 0;
|
||||||
|
rval = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
gtid_pos_t gtid_pos_t::from_string(std::string str)
|
||||||
|
{
|
||||||
|
gtid_pos_t gtid;
|
||||||
|
gtid.parse(str.c_str());
|
||||||
|
return gtid;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string gtid_pos_t::to_string() const
|
||||||
|
{
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << domain << "-" << server_id << "-" << seq;
|
||||||
|
return ss.str();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool gtid_pos_t::empty() const
|
||||||
|
{
|
||||||
|
return timestamp == 0 && domain == 0 && server_id == 0 && seq == 0 && event_num == 0;
|
||||||
|
}
|
||||||
@ -15,6 +15,7 @@
|
|||||||
#include <vector>
|
#include <vector>
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <sstream>
|
||||||
#include <tr1/memory>
|
#include <tr1/memory>
|
||||||
#include <tr1/unordered_map>
|
#include <tr1/unordered_map>
|
||||||
|
|
||||||
@ -45,14 +46,11 @@ struct gtid_pos_t
|
|||||||
* an event inside a GTID event and it is used to
|
* an event inside a GTID event and it is used to
|
||||||
* rebuild GTID events in the correct order. */
|
* rebuild GTID events in the correct order. */
|
||||||
|
|
||||||
void extract(const REP_HEADER& hdr, uint8_t* ptr)
|
void extract(const REP_HEADER& hdr, uint8_t* ptr);
|
||||||
{
|
bool parse(const char* str);
|
||||||
domain = extract_field(ptr + 8, 32);
|
static gtid_pos_t from_string(std::string str);
|
||||||
server_id = hdr.serverid;
|
std::string to_string() const;
|
||||||
seq = extract_field(ptr, 64);
|
bool empty() const;
|
||||||
event_num = 0;
|
|
||||||
timestamp = hdr.timestamp;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/** A single column in a CREATE TABLE statement */
|
/** A single column in a CREATE TABLE statement */
|
||||||
@ -196,7 +194,7 @@ public:
|
|||||||
Rpl& operator=(const Rpl&) = delete;
|
Rpl& operator=(const Rpl&) = delete;
|
||||||
|
|
||||||
// Construct a new replication stream transformer
|
// Construct a new replication stream transformer
|
||||||
Rpl(SERVICE* service, SRowEventHandler event_handler);
|
Rpl(SERVICE* service, SRowEventHandler event_handler, gtid_pos_t = {});
|
||||||
|
|
||||||
// Add a stored TableCreateEvent
|
// Add a stored TableCreateEvent
|
||||||
void add_create(STableCreateEvent create);
|
void add_create(STableCreateEvent create);
|
||||||
|
|||||||
Reference in New Issue
Block a user