Update the CDC connector

The CDC connector now uses a non-blocking socket for the reads. This
allows the possibility of adding read timeouts.

Added some utility functions for dealing with GTIDs and delayed the
reading of the first row.
This commit is contained in:
Markus Mäkelä
2017-10-31 08:50:38 +02:00
committed by Johan Wikman
parent 0bb54511b7
commit 96d9ec1b6d
2 changed files with 92 additions and 50 deletions

View File

@ -1,6 +1,7 @@
#include "cdc_connector.h"
#include <arpa/inet.h>
#include <fcntl.h>
#include <stdexcept>
#include <unistd.h>
#include <string.h>
@ -9,6 +10,7 @@
#include <sys/socket.h>
#include <sys/types.h>
#include <jansson.h>
#include <iostream>
#define CDC_CONNECTOR_VERSION "1.0.0"
@ -24,28 +26,10 @@ static const char REQUEST_MSG[] = "REQUEST-DATA ";
namespace
{
static inline int nointr_read(int fd, void *dest, size_t size)
static inline void millisleep(int millis)
{
int rc = read(fd, dest, size);
while (rc == -1 && errno == EINTR)
{
rc = read(fd, dest, size);
}
return rc;
}
static inline int nointr_write(int fd, const void *src, size_t size)
{
int rc = write(fd, src, size);
while (rc == -1 && errno == EINTR)
{
rc = write(fd, src, size);
}
return rc;
struct timespec ts = {0, 1000000};
nanosleep(&ts, NULL);
}
static std::string bin2hex(const uint8_t *data, size_t len)
@ -133,7 +117,8 @@ Connection::Connection(const std::string& address,
m_port(port),
m_user(user),
m_password(password),
m_flags(flags)
m_flags(flags),
m_running(true)
{
}
@ -167,6 +152,15 @@ bool Connection::createConnection()
}
m_fd = fd;
int fl;
if ((fl = fcntl(fd, F_GETFL, 0)) == -1 ||
fcntl(fd, F_SETFL, fl | O_NONBLOCK) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to set socket non-blocking: ";
m_error += strerror_r(errno, err, sizeof (err));
}
if (connect(fd, (struct sockaddr*) &remote, sizeof (remote)) == -1)
{
@ -187,7 +181,7 @@ void Connection::closeConnection()
{
if (m_fd != -1)
{
nointr_write(m_fd, CLOSE_MSG, sizeof (CLOSE_MSG) - 1);
nointr_write(CLOSE_MSG, sizeof (CLOSE_MSG) - 1);
close(m_fd);
m_fd = -1;
}
@ -206,27 +200,13 @@ bool Connection::requestData(const std::string& table, const std::string& gtid)
req_msg += gtid;
}
if (nointr_write(m_fd, req_msg.c_str(), req_msg.length()) == -1)
if (nointr_write(req_msg.c_str(), req_msg.length()) == -1)
{
rval = false;
char err[ERRBUF_SIZE];
m_error = "Failed to write request: ";
m_error += strerror_r(errno, err, sizeof (err));
}
else
{
// Read the first row to know if data request was successful
Row row = read();
if (row)
{
m_first_row = row;
}
else
{
rval = false;
}
}
return rval;
}
@ -258,8 +238,14 @@ void Connection::processSchema(json_t* json)
{
json_t* name = json_object_get(v, "name");
json_t* type = json_object_get(v, "real_type");
if (type == NULL)
{
// Use the Avro type for generated columns
type = json_object_get(v, "type");
}
std::string nameval = name ? json_string_value(name) : "";
std::string typeval = type ? json_string_value(type) : "undefined";
std::string typeval = type ? (json_is_string(type) ? json_string_value(type) : "char(50)") : "undefined";
m_keys.push_back(nameval);
m_types.push_back(typeval);
}
@ -302,11 +288,7 @@ Row Connection::read()
Row rval;
std::string row;
if (m_first_row)
{
rval.swap(m_first_row);
}
else if (readRow(row))
if (readRow(row))
{
json_error_t err;
json_t* js = json_loads(row.c_str(), JSON_ALLOW_NUL, &err);
@ -315,6 +297,7 @@ Row Connection::read()
{
if (is_schema(js))
{
m_schema = row;
processSchema(js);
rval = Connection::read();
}
@ -345,7 +328,7 @@ bool Connection::doAuth()
std::string auth_str = generateAuthString(m_user, m_password);
/** Send the auth string */
if (nointr_write(m_fd, auth_str.c_str(), auth_str.length()) == -1)
if (nointr_write(auth_str.c_str(), auth_str.length()) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to write authentication data: ";
@ -357,7 +340,7 @@ bool Connection::doAuth()
char buf[READBUF_SIZE];
int bytes;
if ((bytes = nointr_read(m_fd, buf, sizeof (buf))) == -1)
if ((bytes = nointr_read(buf, sizeof (buf))) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to read authentication response: ";
@ -397,7 +380,7 @@ bool Connection::doRegistration()
reg_msg += type;
/** Send the registration message */
if (nointr_write(m_fd, reg_msg.c_str(), reg_msg.length()) == -1)
if (nointr_write(reg_msg.c_str(), reg_msg.length()) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to write registration message: ";
@ -409,7 +392,7 @@ bool Connection::doRegistration()
char buf[READBUF_SIZE];
int bytes;
if ((bytes = nointr_read(m_fd, buf, sizeof (buf))) == -1)
if ((bytes = nointr_read(buf, sizeof (buf))) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to read registration response: ";
@ -437,7 +420,7 @@ bool Connection::readRow(std::string& dest)
while (true)
{
char buf;
int rc = nointr_read(m_fd, &buf, 1);
int rc = nointr_read(&buf, 1);
if (rc == -1)
{
@ -469,4 +452,31 @@ bool Connection::readRow(std::string& dest)
return rval;
}
int Connection::nointr_read(void *dest, size_t size)
{
int rc = ::read(m_fd, dest, size);
while (m_running && rc == -1 && (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN))
{
millisleep(1);
rc = ::read(m_fd, dest, size);
}
return rc;
}
int Connection::nointr_write(const void *src, size_t size)
{
bool wouldblock = false;
int rc = write(m_fd, src, size);
while (m_running && rc == -1 && (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN))
{
millisleep(1);
rc = write(m_fd, src, size);
}
return rc;
}
}