diff --git a/maxscale-system-test/cdc_connector/CMakeLists.txt b/maxscale-system-test/cdc_connector/CMakeLists.txt index 6cf309e9e..88f3d56fe 100644 --- a/maxscale-system-test/cdc_connector/CMakeLists.txt +++ b/maxscale-system-test/cdc_connector/CMakeLists.txt @@ -1,5 +1,10 @@ include(cmake/BuildJansson.cmake) +set(CMAKE_CXX_FLAGS "-std=c++0x") +set(CMAKE_CXX_FLAGS_DEBUG "-std=c++0x -ggdb") +set(CMAKE_CXX_FLAGS_RELEASE "-std=c++0x -O2") +set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-std=c++0x -O2") + include_directories(${JANSSON_INCLUDE_DIR}) add_library(cdc_connector STATIC cdc_connector.cpp) target_link_libraries(cdc_connector ${JANSSON_LIBRARIES}) diff --git a/maxscale-system-test/cdc_connector/cdc_connector.cpp b/maxscale-system-test/cdc_connector/cdc_connector.cpp index b6eee5d5b..407b3564d 100644 --- a/maxscale-system-test/cdc_connector/cdc_connector.cpp +++ b/maxscale-system-test/cdc_connector/cdc_connector.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -74,6 +75,45 @@ std::string generateAuthString(const std::string& user, const std::string& passw return part1 + part2; } + + +std::string json_to_string(json_t* json) +{ + std::stringstream ss; + + switch (json_typeof(json)) + { + case JSON_STRING: + ss << json_string_value(json); + break; + + case JSON_INTEGER: + ss << json_integer_value(json); + break; + + case JSON_REAL: + ss << json_real_value(json); + break; + + case JSON_TRUE: + ss << "true"; + break; + + case JSON_FALSE: + ss << "false"; + break; + + case JSON_NULL: + break; + + default: + break; + + } + + return ss.str(); +} + } namespace CDC @@ -93,7 +133,9 @@ Connection::Connection(const std::string& address, m_port(port), m_user(user), m_password(password), - m_flags(flags) { } + m_flags(flags) +{ +} Connection::~Connection() { @@ -172,48 +214,103 @@ bool Connection::requestData(const std::string& table, const std::string& gtid) m_error += strerror_r(errno, err, sizeof (err)); } - if (rval) + return rval; +} + +static inline bool is_schema(json_t* json) +{ + bool rval = false; + json_t* j = json_object_get(json, "fields"); + + if (j && json_is_array(j) && json_array_size(j)) { - /** Read the Avro schema */ - rval = readRow(m_schema); + rval = json_object_get(json_array_get(j, 0), "name") != NULL; } return rval; } -bool Connection::readRow(std::string& dest) +void Connection::processSchema(json_t* json) { - bool rval = true; + m_keys.clear(); + m_types.clear(); - while (true) + json_t* arr = json_object_get(json, "fields"); + char* raw = json_dumps(json, 0); + size_t i; + json_t* v; + + json_array_foreach(arr, i, v) { - char buf; - int rc = nointr_read(m_fd, &buf, 1); + json_t* name = json_object_get(v, "name"); + json_t* type = json_object_get(v, "real_type"); + std::string nameval = name ? json_string_value(name) : ""; + std::string typeval = type ? json_string_value(type) : "undefined"; + m_keys.push_back(nameval); + m_types.push_back(typeval); + } +} - if (rc == -1) - { - rval = false; - char err[ERRBUF_SIZE]; - m_error = "Failed to read row: "; - m_error += strerror_r(errno, err, sizeof (err)); - break; - } +Row Connection::processRow(json_t* js) +{ + ValueList values; + m_error.clear(); - if (buf == '\n') + for (ValueList::iterator it = m_keys.begin(); + it != m_keys.end(); it++) + { + json_t* v = json_object_get(js, it->c_str()); + + if (v) { - break; + values.push_back(json_to_string(v)); } else { - dest += buf; + m_error = "No value for key found: "; + m_error += *it; + break; + } + } - if (dest[0] == 'E' && dest[1] == 'R' & dest[2] == 'R') + Row rval; + + if (m_error.empty()) + { + rval = Row(new InternalRow(m_keys, m_types, values)); + } + + return rval; +} + +Row Connection::read() +{ + Row rval; + std::string row; + + if (readRow(row)) + { + json_error_t err; + json_t* js = json_loads(row.c_str(), JSON_ALLOW_NUL, &err); + + if (js) + { + if (is_schema(js)) { - m_error = "Server responded with an error: "; - m_error += dest; - rval = false; - break; + processSchema(js); + rval = Connection::read(); } + else + { + rval = processRow(js); + } + + json_decref(js); + } + else + { + m_error = "Failed to parse JSON: "; + m_error += err.text; } } @@ -315,4 +412,43 @@ bool Connection::doRegistration() return rval; } +bool Connection::readRow(std::string& dest) +{ + bool rval = true; + + while (true) + { + char buf; + int rc = nointr_read(m_fd, &buf, 1); + + if (rc == -1) + { + rval = false; + char err[ERRBUF_SIZE]; + m_error = "Failed to read row: "; + m_error += strerror_r(errno, err, sizeof (err)); + break; + } + + if (buf == '\n') + { + break; + } + else + { + dest += buf; + + if (dest[0] == 'E' && dest[1] == 'R' & dest[2] == 'R') + { + m_error = "Server responded with an error: "; + m_error += dest; + rval = false; + break; + } + } + } + + return rval; +} + } diff --git a/maxscale-system-test/cdc_connector/cdc_connector.h b/maxscale-system-test/cdc_connector/cdc_connector.h index 74969bfb9..42b9c57f7 100644 --- a/maxscale-system-test/cdc_connector/cdc_connector.h +++ b/maxscale-system-test/cdc_connector/cdc_connector.h @@ -1,5 +1,9 @@ #include #include +#include +#include +#include +#include /** Request format flags */ #define CDC_REQUEST_TYPE_JSON (1 << 0) @@ -8,6 +12,13 @@ namespace CDC { +// The typedef for the Row type +class InternalRow; +typedef std::tr1::shared_ptr Row; + +typedef std::vector ValueList; + +// A class that represents a CDC connection class Connection { public: @@ -19,7 +30,7 @@ public: virtual ~Connection(); bool createConnection(); bool requestData(const std::string& table, const std::string& gtid = ""); - bool readRow(std::string& dest); + Row read(); void closeConnection(); const std::string& getSchema() const { @@ -39,9 +50,73 @@ private: std::string m_password; std::string m_error; std::string m_schema; + ValueList m_keys; + ValueList m_types; bool doAuth(); bool doRegistration(); + bool readRow(std::string& dest); + void processSchema(json_t* json); + Row processRow(json_t*); +}; + +// Internal representation of a row, used via the Row type +class InternalRow +{ +public: + + size_t fieldCount() const + { + return m_values.size(); + } + + const std::string& value(size_t i) const + { + return m_values[i]; + } + + const std::string& value(const std::string& str) const + { + ValueList::const_iterator it = std::find(m_keys.begin(), m_keys.end(), str); + return m_values[it - m_keys.begin()]; + } + + const std::string& key(size_t i) const + { + return m_keys[i]; + } + + const std::string& type(size_t i) const + { + return m_types[i]; + } + + ~InternalRow() + { + } + +private: + ValueList m_keys; + ValueList m_types; + ValueList m_values; + + // Not intended to be copied + InternalRow(const InternalRow&); + InternalRow& operator=(const InternalRow&); + InternalRow(); + + // Only a Connection should construct an InternalRow + friend class Connection; + + InternalRow(const ValueList& keys, + const ValueList& types, + const ValueList& values): + m_keys(keys), + m_types(types), + m_values(values) + { + } + }; } diff --git a/maxscale-system-test/cdc_datatypes/cdc_datatypes.cpp b/maxscale-system-test/cdc_datatypes/cdc_datatypes.cpp index 7496c429f..b85c48c17 100644 --- a/maxscale-system-test/cdc_datatypes/cdc_datatypes.cpp +++ b/maxscale-system-test/cdc_datatypes/cdc_datatypes.cpp @@ -139,6 +139,16 @@ std::string type_to_table_name(const char* type) return name; } +static std::string unquote(std::string str) +{ + if (str[0] == '\"') + { + str = str.substr(1, str.length() - 2); + } + + return str; +} + bool run_test(TestConnections& test) { bool rval = true; @@ -172,17 +182,17 @@ bool run_test(TestConnections& test) { for (int j = 0; test_set[x].values[j]; j++) { - std::string row; + CDC::Row row; - if (conn.readRow(row)) + if ((row = conn.read())) { - TestInput input(test_set[x].values[j], test_set[x].types[i]); - TestOutput output(row, field_name); + std::string input = unquote(test_set[x].values[j]); + std::string output = row->value(field_name); - if (input != output) + if (input != output && (input != "NULL" || output != "")) { test.tprintf("Result mismatch: %s(%s) => %s", - test_set[x].types[i], test_set[x].values[j], output.getValue().c_str()); + test_set[x].types[i], input.c_str(), output.c_str()); rval = false; } }