MXS-1413: Add row processing to cdc_connector

The connector now processes rows into objects which expose the values and
types of each returned row.
This commit is contained in:
Markus Mäkelä
2017-09-14 15:09:39 +03:00
parent 36ec6e443e
commit 3541d6e0a4
4 changed files with 258 additions and 32 deletions

View File

@ -4,6 +4,7 @@
#include <stdexcept>
#include <unistd.h>
#include <string.h>
#include <sstream>
#include <openssl/sha.h>
#include <sys/socket.h>
#include <sys/types.h>
@ -74,6 +75,45 @@ std::string generateAuthString(const std::string& user, const std::string& passw
return part1 + part2;
}
std::string json_to_string(json_t* json)
{
std::stringstream ss;
switch (json_typeof(json))
{
case JSON_STRING:
ss << json_string_value(json);
break;
case JSON_INTEGER:
ss << json_integer_value(json);
break;
case JSON_REAL:
ss << json_real_value(json);
break;
case JSON_TRUE:
ss << "true";
break;
case JSON_FALSE:
ss << "false";
break;
case JSON_NULL:
break;
default:
break;
}
return ss.str();
}
}
namespace CDC
@ -93,7 +133,9 @@ Connection::Connection(const std::string& address,
m_port(port),
m_user(user),
m_password(password),
m_flags(flags) { }
m_flags(flags)
{
}
Connection::~Connection()
{
@ -172,48 +214,103 @@ bool Connection::requestData(const std::string& table, const std::string& gtid)
m_error += strerror_r(errno, err, sizeof (err));
}
if (rval)
return rval;
}
static inline bool is_schema(json_t* json)
{
bool rval = false;
json_t* j = json_object_get(json, "fields");
if (j && json_is_array(j) && json_array_size(j))
{
/** Read the Avro schema */
rval = readRow(m_schema);
rval = json_object_get(json_array_get(j, 0), "name") != NULL;
}
return rval;
}
bool Connection::readRow(std::string& dest)
void Connection::processSchema(json_t* json)
{
bool rval = true;
m_keys.clear();
m_types.clear();
while (true)
json_t* arr = json_object_get(json, "fields");
char* raw = json_dumps(json, 0);
size_t i;
json_t* v;
json_array_foreach(arr, i, v)
{
char buf;
int rc = nointr_read(m_fd, &buf, 1);
json_t* name = json_object_get(v, "name");
json_t* type = json_object_get(v, "real_type");
std::string nameval = name ? json_string_value(name) : "";
std::string typeval = type ? json_string_value(type) : "undefined";
m_keys.push_back(nameval);
m_types.push_back(typeval);
}
}
if (rc == -1)
{
rval = false;
char err[ERRBUF_SIZE];
m_error = "Failed to read row: ";
m_error += strerror_r(errno, err, sizeof (err));
break;
}
Row Connection::processRow(json_t* js)
{
ValueList values;
m_error.clear();
if (buf == '\n')
for (ValueList::iterator it = m_keys.begin();
it != m_keys.end(); it++)
{
json_t* v = json_object_get(js, it->c_str());
if (v)
{
break;
values.push_back(json_to_string(v));
}
else
{
dest += buf;
m_error = "No value for key found: ";
m_error += *it;
break;
}
}
if (dest[0] == 'E' && dest[1] == 'R' & dest[2] == 'R')
Row rval;
if (m_error.empty())
{
rval = Row(new InternalRow(m_keys, m_types, values));
}
return rval;
}
Row Connection::read()
{
Row rval;
std::string row;
if (readRow(row))
{
json_error_t err;
json_t* js = json_loads(row.c_str(), JSON_ALLOW_NUL, &err);
if (js)
{
if (is_schema(js))
{
m_error = "Server responded with an error: ";
m_error += dest;
rval = false;
break;
processSchema(js);
rval = Connection::read();
}
else
{
rval = processRow(js);
}
json_decref(js);
}
else
{
m_error = "Failed to parse JSON: ";
m_error += err.text;
}
}
@ -315,4 +412,43 @@ bool Connection::doRegistration()
return rval;
}
bool Connection::readRow(std::string& dest)
{
bool rval = true;
while (true)
{
char buf;
int rc = nointr_read(m_fd, &buf, 1);
if (rc == -1)
{
rval = false;
char err[ERRBUF_SIZE];
m_error = "Failed to read row: ";
m_error += strerror_r(errno, err, sizeof (err));
break;
}
if (buf == '\n')
{
break;
}
else
{
dest += buf;
if (dest[0] == 'E' && dest[1] == 'R' & dest[2] == 'R')
{
m_error = "Server responded with an error: ";
m_error += dest;
rval = false;
break;
}
}
}
return rval;
}
}