Files
MaxScale/connectors/cdc-connector/cdc_connector.cpp
Markus Mäkelä a3fd5a0218 MXS-2854: Repeat read on schema event
The read needs to be repeated if MaxScale sends a schema event.
2020-01-29 14:35:03 +02:00

768 lines
17 KiB
C++

/* Copyright (c) 2017, MariaDB Corporation. All rights reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301 USA
*/
#include "cdc_connector.h"
#include <arpa/inet.h>
#include <assert.h>
#include <fcntl.h>
#include <iostream>
#include <jansson.h>
#include <netdb.h>
#include <openssl/sha.h>
#include <poll.h>
#include <sstream>
#include <stdexcept>
#include <iterator>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#define CDC_CONNECTOR_VERSION "1.0.0"
#define ERRBUF_SIZE 512
#define READBUF_SIZE 32 * 1024
static const char OK_RESPONSE[] = "OK\n";
static const char CLOSE_MSG[] = "CLOSE";
static const char REGISTER_MSG[] = "REGISTER UUID=CDC_CONNECTOR-" CDC_CONNECTOR_VERSION ", TYPE=";
static const char REQUEST_MSG[] = "REQUEST-DATA ";
namespace
{
static std::string bin2hex(const uint8_t* data, size_t len)
{
std::string result;
static const char hexconvtab[] = "0123456789abcdef";
for (size_t i = 0; i < len; i++)
{
result += hexconvtab[data[i] >> 4];
result += hexconvtab[data[i] & 0x0f];
}
return result;
}
std::string generateAuthString(const std::string& user, const std::string& password)
{
uint8_t digest[SHA_DIGEST_LENGTH];
SHA1(reinterpret_cast<const uint8_t*>(password.c_str()), password.length(), digest);
std::string auth_str = user;
auth_str += ":";
std::string part1 = bin2hex((uint8_t*)auth_str.c_str(), auth_str.length());
std::string part2 = bin2hex(digest, sizeof(digest));
return part1 + part2;
}
std::string json_to_string(json_t* json)
{
std::stringstream ss;
switch (json_typeof(json))
{
case JSON_STRING:
ss << json_string_value(json);
break;
case JSON_INTEGER:
ss << json_integer_value(json);
break;
case JSON_REAL:
ss << json_real_value(json);
break;
case JSON_TRUE:
ss << "true";
break;
case JSON_FALSE:
ss << "false";
break;
case JSON_NULL:
break;
default:
break;
}
return ss.str();
}
// Helper class for closing objects
template<class T>
class Closer
{
public:
Closer(T t)
: m_t(t)
, m_close(true)
{
}
~Closer()
{
if (m_close)
{
close(m_t);
}
}
/**
* Release the stored value
*
* Releasing the value prevents it from being closed when the class is deleted
*
* @return A copy of the stored value
*/
T release()
{
m_close = false;
return m_t;
}
private:
T m_t;
bool m_close;
void close(T t);
};
template<>
void Closer<struct addrinfo*>::close(struct addrinfo* ai)
{
freeaddrinfo(ai);
}
template<>
void Closer<int>::close(int fd)
{
::close(fd);
}
}
namespace CDC
{
const char* const TIMEOUT = "Request timed out";
/**
* Public functions
*/
Connection::Connection(const std::string& address,
uint16_t port,
const std::string& user,
const std::string& password,
int timeout)
: m_fd(-1)
, m_port(port)
, m_address(address)
, m_user(user)
, m_password(password)
, m_timeout(timeout)
, m_connected(false)
{
}
Connection::~Connection()
{
close();
}
bool Connection::connect(const std::string& table, const std::string& gtid)
{
bool rval = false;
try
{
if (m_connected)
{
m_error = "Already connected";
return false;
}
m_error.clear();
struct addrinfo* ai = NULL, hint = {};
hint.ai_socktype = SOCK_STREAM;
hint.ai_family = AF_UNSPEC;
hint.ai_flags = AI_ALL;
if (getaddrinfo(m_address.c_str(), NULL, &hint, &ai) != 0 || ai == NULL)
{
char err[ERRBUF_SIZE];
m_error = "Invalid address (";
m_error += m_address;
m_error += "): ";
m_error += strerror_r(errno, err, sizeof(err));
return false;
}
Closer<struct addrinfo*> c_ai(ai);
struct sockaddr_in remote = {};
memcpy(&remote, ai->ai_addr, ai->ai_addrlen);
remote.sin_port = htons(m_port);
remote.sin_family = AF_INET;
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to create socket: ";
m_error += strerror_r(errno, err, sizeof(err));
return false;
}
Closer<int> c_fd(fd);
int fl;
if (::connect(fd, (struct sockaddr*)&remote, sizeof(remote)) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to connect: ";
m_error += strerror_r(errno, err, sizeof(err));
}
else if ((fl = fcntl(fd, F_GETFL, 0)) == -1
|| fcntl(fd, F_SETFL, fl | O_NONBLOCK) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to set socket non-blocking: ";
m_error += strerror_r(errno, err, sizeof(err));
}
else
{
m_fd = c_fd.release();
m_connected = true;
if (do_auth() && do_registration())
{
std::string req_msg(REQUEST_MSG);
req_msg += table;
if (gtid.length())
{
req_msg += " ";
req_msg += gtid;
}
if (nointr_write(req_msg.c_str(), req_msg.length()) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to write request: ";
m_error += strerror_r(errno, err, sizeof(err));
}
else if (read_schema())
{
rval = true;
}
}
}
}
catch (const std::exception& ex)
{
m_error = "Exception caught: ";
m_error += ex.what();
}
return rval;
}
void Connection::close()
{
m_error.clear();
if (m_fd != -1)
{
nointr_write(CLOSE_MSG, sizeof(CLOSE_MSG) - 1);
::close(m_fd);
m_fd = -1;
}
}
static inline bool is_schema(json_t* json)
{
bool rval = false;
json_t* j = json_object_get(json, "fields");
if (j && json_is_array(j) && json_array_size(j))
{
rval = json_object_get(json_array_get(j, 0), "name") != NULL;
}
return rval;
}
void Connection::process_schema(json_t* json)
{
SValueVector keys(new ValueVector);
SValueVector types(new ValueVector);
json_t* arr = json_object_get(json, "fields");
size_t i;
json_t* v;
json_array_foreach(arr, i, v)
{
json_t* name = json_object_get(v, "name");
json_t* type = json_object_get(v, "real_type");
json_t* length = json_object_get(v, "length");
if (type == NULL)
{
// Use the Avro type for generated columns
type = json_object_get(v, "type");
}
std::string nameval = name ? json_string_value(name) : "";
std::string typeval =
type ? (json_is_string(type) ? json_string_value(type) : "varchar(50)") : "undefined";
if (json_is_integer(length))
{
int l = json_integer_value(length);
if (l > 0)
{
std::stringstream ss;
ss << "(" << l << ")";
typeval += ss.str();
}
}
keys->push_back(nameval);
types->push_back(typeval);
}
m_keys = keys;
m_types = types;
}
SRow Connection::process_row(json_t* js)
{
std::set<size_t> nulls;
ValueVector values;
values.reserve(m_keys->size());
m_error.clear();
for (ValueVector::iterator it = m_keys->begin();
it != m_keys->end(); it++)
{
json_t* v = json_object_get(js, it->c_str());
if (v)
{
if (json_is_null(v))
{
nulls.insert(values.size());
}
values.push_back(json_to_string(v));
}
else
{
m_error = "No value for key found: ";
m_error += *it;
break;
}
}
SRow rval;
if (m_error.empty())
{
rval = SRow(new Row(m_keys, m_types, values, nulls));
}
return rval;
}
bool Connection::read_schema()
{
m_error.clear();
bool rval = false;
std::string row;
if (read_row(row))
{
json_error_t err;
json_t* js = json_loads(row.c_str(), JSON_ALLOW_NUL, &err);
if (js)
{
if (is_schema(js))
{
m_schema = row;
process_schema(js);
rval = true;
}
json_decref(js);
}
else
{
m_error = "Failed to parse JSON: ";
m_error += err.text;
m_error += ". Data received so far: ";
m_error += row;
}
}
if (m_error == CDC::TIMEOUT)
{
assert(rval == false);
m_error += ". Data received so far: '";
std::copy(m_buffer.begin(), m_buffer.end(), std::back_inserter(m_error));
m_error += "'";
}
return rval;
}
SRow Connection::read()
{
m_error.clear();
SRow rval;
std::string row;
while (true)
{
if (read_row(row))
{
json_error_t err;
json_t* js = json_loads(row.c_str(), JSON_ALLOW_NUL, &err);
if (js)
{
if (is_schema(js))
{
m_schema = row;
process_schema(js);
json_decref(js);
continue;
}
else
{
rval = process_row(js);
json_decref(js);
}
}
else
{
m_error = "Failed to parse JSON: ";
m_error += err.text;
}
}
break;
}
return rval;
}
/**
* Private functions
*/
bool Connection::do_auth()
{
bool rval = false;
std::string auth_str = generateAuthString(m_user, m_password);
/** Send the auth string */
int rc = nointr_write(auth_str.c_str(), auth_str.length());
if (rc <= 0)
{
char err[ERRBUF_SIZE];
m_error = "Failed to write authentication data: ";
m_error += rc == -1 ? strerror_r(errno, err, sizeof(err)) : "Write timeout";
}
else
{
/** Read the response */
char buf[READBUF_SIZE];
int bytes;
if ((bytes = nointr_read(buf, sizeof(buf))) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to read authentication response: ";
m_error += strerror_r(errno, err, sizeof(err));
}
else if (memcmp(buf, OK_RESPONSE, sizeof(OK_RESPONSE) - 1) != 0)
{
buf[bytes] = '\0';
m_error = "Authentication failed: ";
m_error += bytes > 0 ? buf : "Request timed out";
}
else
{
rval = true;
}
}
return rval;
}
bool Connection::do_registration()
{
bool rval = false;
std::string reg_msg(REGISTER_MSG);
reg_msg += "JSON";
/** Send the registration message */
if (nointr_write(reg_msg.c_str(), reg_msg.length()) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to write registration message: ";
m_error += strerror_r(errno, err, sizeof(err));
}
else
{
/** Read the response */
char buf[READBUF_SIZE];
int bytes;
if ((bytes = nointr_read(buf, sizeof(buf))) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to read registration response: ";
m_error += strerror_r(errno, err, sizeof(err));
}
else if (memcmp(buf, OK_RESPONSE, sizeof(OK_RESPONSE) - 1) != 0)
{
buf[bytes] = '\0';
m_error = "Registration failed: ";
m_error += buf;
}
else
{
rval = true;
}
}
return rval;
}
bool Connection::is_error()
{
bool rval = false;
if (m_buffer.size() >= 3 && m_buffer[0] == 'E' && m_buffer[1] == 'R' && m_buffer[2] == 'R')
{
m_error = "MaxScale responded with an error: ";
m_error.append(m_buffer.begin(), m_buffer.end());
rval = true;
}
return rval;
}
bool Connection::read_row(std::string& dest)
{
bool rval = true;
while (true)
{
if (!m_buffer.empty())
{
if (is_error())
{
rval = false;
break;
}
std::deque<char>::iterator it = std::find(m_buffer.begin(), m_buffer.end(), '\n');
if (it != m_buffer.end())
{
dest.assign(m_buffer.begin(), it);
m_buffer.erase(m_buffer.begin(), std::next(it));
assert(m_buffer.empty() || m_buffer[0] != '\n');
break;
}
}
char buf[READBUF_SIZE + 1];
int rc = nointr_read(&buf, sizeof(buf));
if (rc == -1)
{
rval = false;
char err[ERRBUF_SIZE];
m_error = "Failed to read row: ";
m_error += strerror_r(errno, err, sizeof(err));
break;
}
else if (rc == 0)
{
rval = false;
m_error = CDC::TIMEOUT;
break;
}
assert(std::find(m_buffer.begin(), m_buffer.end(), '\n') == m_buffer.end());
std::copy(buf, buf + rc, std::back_inserter(m_buffer));
if (is_error())
{
rval = false;
break;
}
}
return rval;
}
#define is_poll_error(e) ((e & (POLLERR | POLLHUP | POLLNVAL)))
static std::string event_to_string(int event)
{
std::string rval;
if (event & POLLIN)
{
rval += "POLLIN ";
}
if (event & POLLPRI)
{
rval += "POLLPRI ";
}
if (event & POLLOUT)
{
rval += "POLLOUT ";
}
#ifdef POLLRDHUP
if (event & POLLRDHUP)
{
rval += "POLLRDHUP ";
}
#endif
if (event & POLLERR)
{
rval += "POLLERR ";
}
if (event & POLLHUP)
{
rval += "POLLHUP ";
}
if (event & POLLNVAL)
{
rval += "POLLNVAL ";
}
return rval;
}
int Connection::wait_for_event(short events)
{
nfds_t nfds = 1;
struct pollfd pfd;
pfd.fd = m_fd;
pfd.events = events;
int rc;
while ((rc = poll(&pfd, nfds, m_timeout * 1000)) < 0 && errno == EINTR)
{
}
if (rc > 0 && is_poll_error(pfd.revents))
{
rc = -1;
m_error += "Error when waiting event; ";
m_error += event_to_string(pfd.revents);
}
else if (rc < 0)
{
char err[ERRBUF_SIZE];
m_error = "Failed to wait for event: ";
m_error += strerror_r(errno, err, sizeof(err));
}
return rc;
}
int Connection::nointr_read(void* dest, size_t size)
{
int n_bytes = 0;
if (wait_for_event(POLLIN) > 0)
{
int rc = 0;
while ((rc = ::read(m_fd, dest, size)) < 0 && errno == EINTR)
{
}
if (rc == -1 && errno != EWOULDBLOCK && errno != EAGAIN)
{
char err[ERRBUF_SIZE];
m_error = "Failed to read data: ";
m_error += strerror_r(errno, err, sizeof(err));
n_bytes = -1;
}
else if (rc > 0)
{
n_bytes += rc;
}
}
return n_bytes;
}
int Connection::nointr_write(const void* src, size_t size)
{
int rc = 0;
size_t n_bytes = 0;
const uint8_t* ptr = static_cast<const uint8_t*>(src);
do
{
while ((rc = ::write(m_fd, ptr, size)) < 0 && errno == EINTR)
{
}
if (rc < 0 && errno != EWOULDBLOCK && errno != EAGAIN)
{
char err[ERRBUF_SIZE];
m_error = "Failed to write data: ";
m_error += strerror_r(errno, err, sizeof(err));
n_bytes = -1;
}
else if (rc > 0)
{
n_bytes += rc;
ptr += rc;
size -= rc;
}
}
while (n_bytes < size && wait_for_event(POLLOUT) > 0);
return n_bytes;
}
}