diff --git a/maxscale-system-test/cdc_connector/CMakeLists.txt b/maxscale-system-test/cdc_connector/CMakeLists.txt deleted file mode 100644 index ecf1b8e9b..000000000 --- a/maxscale-system-test/cdc_connector/CMakeLists.txt +++ /dev/null @@ -1,14 +0,0 @@ -project(cdc_connector) -cmake_minimum_required(VERSION 2.8) - -include(GNUInstallDirs) - -set(CMAKE_CXX_FLAGS "-fPIC -std=c++0x") -set(CMAKE_CXX_FLAGS_DEBUG "-fPIC -std=c++0x -ggdb") -set(CMAKE_CXX_FLAGS_RELEASE "-fPIC -std=c++0x -O2") -set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-fPIC -std=c++0x -O2") - -add_library(cdc_connector STATIC cdc_connector.cpp) - -install(TARGETS cdc_connector DESTINATION ${CMAKE_INSTALL_LIBDIR}) -install(FILES cdc_connector.h DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}) diff --git a/maxscale-system-test/cdc_connector/cdc_connector.cpp b/maxscale-system-test/cdc_connector/cdc_connector.cpp deleted file mode 100644 index daea65d4a..000000000 --- a/maxscale-system-test/cdc_connector/cdc_connector.cpp +++ /dev/null @@ -1,482 +0,0 @@ -#include "cdc_connector.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#define CDC_CONNECTOR_VERSION "1.0.0" - -#define ERRBUF_SIZE 512 -#define READBUF_SIZE 1024 - -static const char OK_RESPONSE[] = "OK\n"; - -static const char CLOSE_MSG[] = "CLOSE"; -static const char REGISTER_MSG[] = "REGISTER UUID=CDC_CONNECTOR-" CDC_CONNECTOR_VERSION ", TYPE="; -static const char REQUEST_MSG[] = "REQUEST-DATA "; - -namespace -{ - -static inline void millisleep(int millis) -{ - struct timespec ts = {0, 1000000}; - nanosleep(&ts, NULL); -} - -static std::string bin2hex(const uint8_t *data, size_t len) -{ - std::string result; - static const char hexconvtab[] = "0123456789abcdef"; - - for (int i = 0; i < len; i++) - { - result += hexconvtab[data[i] >> 4]; - result += hexconvtab[data[i] & 0x0f]; - } - - return result; -} - -std::string generateAuthString(const std::string& user, const std::string& password) -{ - uint8_t digest[SHA_DIGEST_LENGTH]; - SHA1(reinterpret_cast (password.c_str()), password.length(), digest); - - std::string auth_str = user; - auth_str += ":"; - - std::string part1 = bin2hex((uint8_t*)auth_str.c_str(), auth_str.length()); - std::string part2 = bin2hex(digest, sizeof(digest)); - - return part1 + part2; -} - - -std::string json_to_string(json_t* json) -{ - std::stringstream ss; - - switch (json_typeof(json)) - { - case JSON_STRING: - ss << json_string_value(json); - break; - - case JSON_INTEGER: - ss << json_integer_value(json); - break; - - case JSON_REAL: - ss << json_real_value(json); - break; - - case JSON_TRUE: - ss << "true"; - break; - - case JSON_FALSE: - ss << "false"; - break; - - case JSON_NULL: - break; - - default: - break; - - } - - return ss.str(); -} - -} - -namespace CDC -{ - -/** - * Public functions - */ - -Connection::Connection(const std::string& address, - uint16_t port, - const std::string& user, - const std::string& password, - uint32_t flags) : - m_fd(-1), - m_address(address), - m_port(port), - m_user(user), - m_password(password), - m_flags(flags), - m_running(true) -{ -} - -Connection::~Connection() -{ - closeConnection(); -} - -bool Connection::createConnection() -{ - bool rval = false; - struct sockaddr_in remote = {}; - - remote.sin_port = htons(m_port); - remote.sin_family = AF_INET; - - if (inet_aton(m_address.c_str(), (struct in_addr*)&remote.sin_addr.s_addr) == 0) - { - m_error = "Invalid address: "; - m_error += m_address; - } - else - { - int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - - if (fd == -1) - { - char err[ERRBUF_SIZE]; - m_error = "Failed to create socket: "; - m_error += strerror_r(errno, err, sizeof (err)); - } - - m_fd = fd; - int fl; - - if ((fl = fcntl(fd, F_GETFL, 0)) == -1 || - fcntl(fd, F_SETFL, fl | O_NONBLOCK) == -1) - { - char err[ERRBUF_SIZE]; - m_error = "Failed to set socket non-blocking: "; - m_error += strerror_r(errno, err, sizeof (err)); - } - - if (connect(fd, (struct sockaddr*) &remote, sizeof (remote)) == -1) - { - char err[ERRBUF_SIZE]; - m_error = "Failed to connect: "; - m_error += strerror_r(errno, err, sizeof (err)); - } - else if (doAuth()) - { - rval = doRegistration(); - } - } - - return rval; -} - -void Connection::closeConnection() -{ - if (m_fd != -1) - { - nointr_write(CLOSE_MSG, sizeof (CLOSE_MSG) - 1); - close(m_fd); - m_fd = -1; - } -} - -bool Connection::requestData(const std::string& table, const std::string& gtid) -{ - bool rval = true; - - std::string req_msg(REQUEST_MSG); - req_msg += table; - - if (gtid.length()) - { - req_msg += " "; - req_msg += gtid; - } - - if (nointr_write(req_msg.c_str(), req_msg.length()) == -1) - { - rval = false; - char err[ERRBUF_SIZE]; - m_error = "Failed to write request: "; - m_error += strerror_r(errno, err, sizeof (err)); - } - - return rval; -} - -static inline bool is_schema(json_t* json) -{ - bool rval = false; - json_t* j = json_object_get(json, "fields"); - - if (j && json_is_array(j) && json_array_size(j)) - { - rval = json_object_get(json_array_get(j, 0), "name") != NULL; - } - - return rval; -} - -void Connection::processSchema(json_t* json) -{ - m_keys.clear(); - m_types.clear(); - - json_t* arr = json_object_get(json, "fields"); - char* raw = json_dumps(json, 0); - size_t i; - json_t* v; - - json_array_foreach(arr, i, v) - { - json_t* name = json_object_get(v, "name"); - json_t* type = json_object_get(v, "real_type"); - if (type == NULL) - { - // Use the Avro type for generated columns - type = json_object_get(v, "type"); - } - std::string nameval = name ? json_string_value(name) : ""; - std::string typeval = type ? (json_is_string(type) ? json_string_value(type) : "char(50)") : "undefined"; - - m_keys.push_back(nameval); - m_types.push_back(typeval); - } -} - -Row Connection::processRow(json_t* js) -{ - ValueList values; - m_error.clear(); - - for (ValueList::iterator it = m_keys.begin(); - it != m_keys.end(); it++) - { - json_t* v = json_object_get(js, it->c_str()); - - if (v) - { - values.push_back(json_to_string(v)); - } - else - { - m_error = "No value for key found: "; - m_error += *it; - break; - } - } - - Row rval; - - if (m_error.empty()) - { - rval = Row(new InternalRow(m_keys, m_types, values)); - } - - return rval; -} - -Row Connection::read() -{ - Row rval; - std::string row; - - if (readRow(row)) - { - json_error_t err; - json_t* js = json_loads(row.c_str(), JSON_ALLOW_NUL, &err); - - if (js) - { - if (is_schema(js)) - { - m_schema = row; - processSchema(js); - rval = Connection::read(); - } - else - { - rval = processRow(js); - } - - json_decref(js); - } - else - { - m_error = "Failed to parse JSON: "; - m_error += err.text; - } - } - - return rval; -} - -/** - * Private functions - */ - -bool Connection::doAuth() -{ - bool rval = false; - std::string auth_str = generateAuthString(m_user, m_password); - - /** Send the auth string */ - if (nointr_write(auth_str.c_str(), auth_str.length()) == -1) - { - char err[ERRBUF_SIZE]; - m_error = "Failed to write authentication data: "; - m_error += strerror_r(errno, err, sizeof (err)); - } - else - { - /** Read the response */ - char buf[READBUF_SIZE]; - int bytes; - - if ((bytes = nointr_read(buf, sizeof (buf))) == -1) - { - char err[ERRBUF_SIZE]; - m_error = "Failed to read authentication response: "; - m_error += strerror_r(errno, err, sizeof (err)); - } - else if (memcmp(buf, OK_RESPONSE, sizeof (OK_RESPONSE) - 1) != 0) - { - buf[bytes] = '\0'; - m_error = "Authentication failed: "; - m_error += buf; - } - else - { - rval = true; - } - } - - return rval; -} - -bool Connection::doRegistration() -{ - bool rval = false; - std::string reg_msg(REGISTER_MSG); - - const char *type = ""; - - if (m_flags & CDC_REQUEST_TYPE_JSON) - { - type = "JSON"; - } - else if (m_flags & CDC_REQUEST_TYPE_AVRO) - { - type = "AVRO"; - } - - reg_msg += type; - - /** Send the registration message */ - if (nointr_write(reg_msg.c_str(), reg_msg.length()) == -1) - { - char err[ERRBUF_SIZE]; - m_error = "Failed to write registration message: "; - m_error += strerror_r(errno, err, sizeof (err)); - } - else - { - /** Read the response */ - char buf[READBUF_SIZE]; - int bytes; - - if ((bytes = nointr_read(buf, sizeof (buf))) == -1) - { - char err[ERRBUF_SIZE]; - m_error = "Failed to read registration response: "; - m_error += strerror_r(errno, err, sizeof (err)); - } - else if (memcmp(buf, OK_RESPONSE, sizeof (OK_RESPONSE) - 1) != 0) - { - buf[bytes] = '\0'; - m_error = "Registration failed: "; - m_error += buf; - } - else - { - rval = true; - } - } - - return rval; -} - -bool Connection::readRow(std::string& dest) -{ - bool rval = true; - - while (true) - { - char buf; - int rc = nointr_read(&buf, 1); - - if (rc == -1) - { - rval = false; - char err[ERRBUF_SIZE]; - m_error = "Failed to read row: "; - m_error += strerror_r(errno, err, sizeof (err)); - break; - } - - if (buf == '\n') - { - break; - } - else - { - dest += buf; - - if (dest[0] == 'E' && dest[1] == 'R' & dest[2] == 'R') - { - m_error = "Server responded with an error: "; - m_error += dest; - rval = false; - break; - } - } - } - - return rval; -} - -int Connection::nointr_read(void *dest, size_t size) -{ - int rc = ::read(m_fd, dest, size); - - while (m_running && rc == -1 && (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)) - { - millisleep(1); - rc = ::read(m_fd, dest, size); - } - - return rc; -} - -int Connection::nointr_write(const void *src, size_t size) -{ - bool wouldblock = false; - int rc = write(m_fd, src, size); - - while (m_running && rc == -1 && (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)) - { - millisleep(1); - rc = write(m_fd, src, size); - } - - return rc; -} - -} diff --git a/maxscale-system-test/cdc_connector/cdc_connector.h b/maxscale-system-test/cdc_connector/cdc_connector.h deleted file mode 100644 index 5f07b4625..000000000 --- a/maxscale-system-test/cdc_connector/cdc_connector.h +++ /dev/null @@ -1,155 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include - -/** Request format flags */ -#define CDC_REQUEST_TYPE_JSON (1 << 0) -#define CDC_REQUEST_TYPE_AVRO (1 << 1) - -namespace CDC -{ - -// The typedef for the Row type -class InternalRow; -typedef std::tr1::shared_ptr Row; - -typedef std::vector ValueList; -typedef std::map ValueMap; - -// A class that represents a CDC connection -class Connection -{ -public: - Connection(const std::string& address, - uint16_t port, - const std::string& user, - const std::string& password, - uint32_t flags = CDC_REQUEST_TYPE_JSON); - virtual ~Connection(); - bool createConnection(); - bool requestData(const std::string& table, const std::string& gtid = ""); - Row read(); - void closeConnection(); - const std::string& getSchema() const - { - return m_schema; - } - const std::string& getError() const - { - return m_error; - } - ValueMap getFields() const - { - ValueMap fields; - - for (size_t i = 0; i < m_keys.size(); i++) - { - fields[m_keys[i]] = m_types[i]; - } - - return fields; - } - void abort() - { - m_running = false; - } - -private: - int m_fd; - uint32_t m_flags; - uint16_t m_port; - std::string m_address; - std::string m_user; - std::string m_password; - std::string m_error; - std::string m_schema; - ValueList m_keys; - ValueList m_types; - bool m_running; - - bool doAuth(); - bool doRegistration(); - bool readRow(std::string& dest); - void processSchema(json_t* json); - Row processRow(json_t*); - - // Lower-level functions - int nointr_read(void *dest, size_t size); - int nointr_write(const void *src, size_t size); -}; - -// Internal representation of a row, used via the Row type -class InternalRow -{ -public: - - size_t fieldCount() const - { - return m_values.size(); - } - - const std::string& value(size_t i) const - { - return m_values[i]; - } - - const std::string& value(const std::string& str) const - { - ValueList::const_iterator it = std::find(m_keys.begin(), m_keys.end(), str); - return m_values[it - m_keys.begin()]; - } - - const std::string gtid() const - { - std::string s; - s += value("domain"); - s += "-"; - s += value("server_id"); - s += "-"; - s += value("sequence"); - return s; - } - - const std::string& key(size_t i) const - { - return m_keys[i]; - } - - const std::string& type(size_t i) const - { - return m_types[i]; - } - - ~InternalRow() - { - } - -private: - ValueList m_keys; - ValueList m_types; - ValueList m_values; - - // Not intended to be copied - InternalRow(const InternalRow&); - InternalRow& operator=(const InternalRow&); - InternalRow(); - - // Only a Connection should construct an InternalRow - friend class Connection; - - InternalRow(const ValueList& keys, - const ValueList& types, - const ValueList& values): - m_keys(keys), - m_types(types), - m_values(values) - { - } - -}; - -} diff --git a/maxscale-system-test/cdc_datatypes/cdc_datatypes.cpp b/maxscale-system-test/cdc_datatypes/cdc_datatypes.cpp index b85c48c17..6beda311e 100644 --- a/maxscale-system-test/cdc_datatypes/cdc_datatypes.cpp +++ b/maxscale-system-test/cdc_datatypes/cdc_datatypes.cpp @@ -178,7 +178,7 @@ bool run_test(TestConnections& test) std::string name = type_to_table_name(test_set[x].types[i]); CDC::Connection conn(test.maxscale_IP, 4001, "skysql", "skysql"); - if (conn.createConnection() && conn.requestData(name)) + if (conn.connect(name)) { for (int j = 0; test_set[x].values[j]; j++) { @@ -198,14 +198,14 @@ bool run_test(TestConnections& test) } else { - std::string err = conn.getError(); + std::string err = conn.error(); test.tprintf("Failed to read data: %s", err.c_str()); } } } else { - std::string err = conn.getError(); + std::string err = conn.error(); test.tprintf("Failed to request data: %s", err.c_str()); rval = false; break; diff --git a/maxscale-system-test/cmake/BuildJansson.cmake b/maxscale-system-test/cmake/BuildJansson.cmake new file mode 100644 index 000000000..a052e3394 --- /dev/null +++ b/maxscale-system-test/cmake/BuildJansson.cmake @@ -0,0 +1,20 @@ +# Download and build the Jansson library + +set(JANSSON_REPO "https://github.com/akheron/jansson.git" CACHE INTERNAL "Jansson Git repository") + +# Release 2.9 of Jansson +set(JANSSON_TAG "v2.9" CACHE INTERNAL "Jansson Git tag") + +ExternalProject_Add(jansson + GIT_REPOSITORY ${JANSSON_REPO} + GIT_TAG ${JANSSON_TAG} + CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${CMAKE_BINARY_DIR}/jansson/install -DCMAKE_C_FLAGS=-fPIC -DJANSSON_BUILD_DOCS=OFF + BINARY_DIR ${CMAKE_BINARY_DIR}/jansson + INSTALL_DIR ${CMAKE_BINARY_DIR}/jansson/install + UPDATE_COMMAND "") + +set(JANSSON_FOUND TRUE CACHE INTERNAL "") +set(JANSSON_STATIC_FOUND TRUE CACHE INTERNAL "") +set(JANSSON_INCLUDE_DIR ${CMAKE_BINARY_DIR}/jansson/install/include CACHE INTERNAL "") +set(JANSSON_STATIC_LIBRARIES ${CMAKE_BINARY_DIR}/jansson/install/lib/libjansson.a CACHE INTERNAL "") +set(JANSSON_LIBRARIES ${JANSSON_STATIC_LIBRARIES} CACHE INTERNAL "") diff --git a/maxscale-system-test/utilities.cmake b/maxscale-system-test/utilities.cmake index fae7464ac..4f5a3aaef 100644 --- a/maxscale-system-test/utilities.cmake +++ b/maxscale-system-test/utilities.cmake @@ -111,10 +111,11 @@ include_directories(${JANSSON_INCLUDE_DIR}) # Build the CDC connector ExternalProject_Add(cdc_connector - SOURCE_DIR ${CMAKE_SOURCE_DIR}/cdc_connector/ - CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${CMAKE_BINARY_DIR}/cdc_connector/ + GIT_REPOSITORY "https://github.com/mariadb-corporation/maxscale-cdc-connector" + CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${CMAKE_BINARY_DIR}/cdc_connector/ -DJANSSON_INCLUDE_DIR=${JANSSON_INCLUDE_DIR} BUILD_COMMAND make - INSTALL_COMMAND make install) + INSTALL_COMMAND make install + UPDATE_COMMAND "") add_dependencies(cdc_connector jansson) set(CDC_CONNECTOR_INCLUDE ${CMAKE_BINARY_DIR}/cdc_connector/include/ CACHE INTERNAL "")