From 96d9ec1b6deca1ee5b77b8ec31b2be3ddf7afb97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Tue, 31 Oct 2017 08:50:38 +0200 Subject: [PATCH] Update the CDC connector The CDC connector now uses a non-blocking socket for the reads. This allows the possibility of adding read timeouts. Added some utility functions for dealing with GTIDs and delayed the reading of the first row. --- .../cdc_connector/cdc_connector.cpp | 108 ++++++++++-------- .../cdc_connector/cdc_connector.h | 34 +++++- 2 files changed, 92 insertions(+), 50 deletions(-) diff --git a/maxscale-system-test/cdc_connector/cdc_connector.cpp b/maxscale-system-test/cdc_connector/cdc_connector.cpp index 5f497f220..daea65d4a 100644 --- a/maxscale-system-test/cdc_connector/cdc_connector.cpp +++ b/maxscale-system-test/cdc_connector/cdc_connector.cpp @@ -1,6 +1,7 @@ #include "cdc_connector.h" #include +#include #include #include #include @@ -9,6 +10,7 @@ #include #include #include +#include #define CDC_CONNECTOR_VERSION "1.0.0" @@ -24,28 +26,10 @@ static const char REQUEST_MSG[] = "REQUEST-DATA "; namespace { -static inline int nointr_read(int fd, void *dest, size_t size) +static inline void millisleep(int millis) { - int rc = read(fd, dest, size); - - while (rc == -1 && errno == EINTR) - { - rc = read(fd, dest, size); - } - - return rc; -} - -static inline int nointr_write(int fd, const void *src, size_t size) -{ - int rc = write(fd, src, size); - - while (rc == -1 && errno == EINTR) - { - rc = write(fd, src, size); - } - - return rc; + struct timespec ts = {0, 1000000}; + nanosleep(&ts, NULL); } static std::string bin2hex(const uint8_t *data, size_t len) @@ -133,7 +117,8 @@ Connection::Connection(const std::string& address, m_port(port), m_user(user), m_password(password), - m_flags(flags) + m_flags(flags), + m_running(true) { } @@ -167,6 +152,15 @@ bool Connection::createConnection() } m_fd = fd; + int fl; + + if ((fl = fcntl(fd, F_GETFL, 0)) == -1 || + fcntl(fd, F_SETFL, fl | O_NONBLOCK) == -1) + { + char err[ERRBUF_SIZE]; + m_error = "Failed to set socket non-blocking: "; + m_error += strerror_r(errno, err, sizeof (err)); + } if (connect(fd, (struct sockaddr*) &remote, sizeof (remote)) == -1) { @@ -187,7 +181,7 @@ void Connection::closeConnection() { if (m_fd != -1) { - nointr_write(m_fd, CLOSE_MSG, sizeof (CLOSE_MSG) - 1); + nointr_write(CLOSE_MSG, sizeof (CLOSE_MSG) - 1); close(m_fd); m_fd = -1; } @@ -206,27 +200,13 @@ bool Connection::requestData(const std::string& table, const std::string& gtid) req_msg += gtid; } - if (nointr_write(m_fd, req_msg.c_str(), req_msg.length()) == -1) + if (nointr_write(req_msg.c_str(), req_msg.length()) == -1) { rval = false; char err[ERRBUF_SIZE]; m_error = "Failed to write request: "; m_error += strerror_r(errno, err, sizeof (err)); } - else - { - // Read the first row to know if data request was successful - Row row = read(); - - if (row) - { - m_first_row = row; - } - else - { - rval = false; - } - } return rval; } @@ -258,8 +238,14 @@ void Connection::processSchema(json_t* json) { json_t* name = json_object_get(v, "name"); json_t* type = json_object_get(v, "real_type"); + if (type == NULL) + { + // Use the Avro type for generated columns + type = json_object_get(v, "type"); + } std::string nameval = name ? json_string_value(name) : ""; - std::string typeval = type ? json_string_value(type) : "undefined"; + std::string typeval = type ? (json_is_string(type) ? json_string_value(type) : "char(50)") : "undefined"; + m_keys.push_back(nameval); m_types.push_back(typeval); } @@ -302,11 +288,7 @@ Row Connection::read() Row rval; std::string row; - if (m_first_row) - { - rval.swap(m_first_row); - } - else if (readRow(row)) + if (readRow(row)) { json_error_t err; json_t* js = json_loads(row.c_str(), JSON_ALLOW_NUL, &err); @@ -315,6 +297,7 @@ Row Connection::read() { if (is_schema(js)) { + m_schema = row; processSchema(js); rval = Connection::read(); } @@ -345,7 +328,7 @@ bool Connection::doAuth() std::string auth_str = generateAuthString(m_user, m_password); /** Send the auth string */ - if (nointr_write(m_fd, auth_str.c_str(), auth_str.length()) == -1) + if (nointr_write(auth_str.c_str(), auth_str.length()) == -1) { char err[ERRBUF_SIZE]; m_error = "Failed to write authentication data: "; @@ -357,7 +340,7 @@ bool Connection::doAuth() char buf[READBUF_SIZE]; int bytes; - if ((bytes = nointr_read(m_fd, buf, sizeof (buf))) == -1) + if ((bytes = nointr_read(buf, sizeof (buf))) == -1) { char err[ERRBUF_SIZE]; m_error = "Failed to read authentication response: "; @@ -397,7 +380,7 @@ bool Connection::doRegistration() reg_msg += type; /** Send the registration message */ - if (nointr_write(m_fd, reg_msg.c_str(), reg_msg.length()) == -1) + if (nointr_write(reg_msg.c_str(), reg_msg.length()) == -1) { char err[ERRBUF_SIZE]; m_error = "Failed to write registration message: "; @@ -409,7 +392,7 @@ bool Connection::doRegistration() char buf[READBUF_SIZE]; int bytes; - if ((bytes = nointr_read(m_fd, buf, sizeof (buf))) == -1) + if ((bytes = nointr_read(buf, sizeof (buf))) == -1) { char err[ERRBUF_SIZE]; m_error = "Failed to read registration response: "; @@ -437,7 +420,7 @@ bool Connection::readRow(std::string& dest) while (true) { char buf; - int rc = nointr_read(m_fd, &buf, 1); + int rc = nointr_read(&buf, 1); if (rc == -1) { @@ -469,4 +452,31 @@ bool Connection::readRow(std::string& dest) return rval; } +int Connection::nointr_read(void *dest, size_t size) +{ + int rc = ::read(m_fd, dest, size); + + while (m_running && rc == -1 && (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)) + { + millisleep(1); + rc = ::read(m_fd, dest, size); + } + + return rc; +} + +int Connection::nointr_write(const void *src, size_t size) +{ + bool wouldblock = false; + int rc = write(m_fd, src, size); + + while (m_running && rc == -1 && (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)) + { + millisleep(1); + rc = write(m_fd, src, size); + } + + return rc; +} + } diff --git a/maxscale-system-test/cdc_connector/cdc_connector.h b/maxscale-system-test/cdc_connector/cdc_connector.h index f0f8833f1..5f07b4625 100644 --- a/maxscale-system-test/cdc_connector/cdc_connector.h +++ b/maxscale-system-test/cdc_connector/cdc_connector.h @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -17,6 +18,7 @@ class InternalRow; typedef std::tr1::shared_ptr Row; typedef std::vector ValueList; +typedef std::map ValueMap; // A class that represents a CDC connection class Connection @@ -40,6 +42,21 @@ public: { return m_error; } + ValueMap getFields() const + { + ValueMap fields; + + for (size_t i = 0; i < m_keys.size(); i++) + { + fields[m_keys[i]] = m_types[i]; + } + + return fields; + } + void abort() + { + m_running = false; + } private: int m_fd; @@ -52,13 +69,17 @@ private: std::string m_schema; ValueList m_keys; ValueList m_types; - Row m_first_row; + bool m_running; bool doAuth(); bool doRegistration(); bool readRow(std::string& dest); void processSchema(json_t* json); Row processRow(json_t*); + + // Lower-level functions + int nointr_read(void *dest, size_t size); + int nointr_write(const void *src, size_t size); }; // Internal representation of a row, used via the Row type @@ -82,6 +103,17 @@ public: return m_values[it - m_keys.begin()]; } + const std::string gtid() const + { + std::string s; + s += value("domain"); + s += "-"; + s += value("server_id"); + s += "-"; + s += value("sequence"); + return s; + } + const std::string& key(size_t i) const { return m_keys[i];