Merge branch '2.2' into develop

This commit is contained in:
Markus Mäkelä
2018-08-28 16:06:23 +03:00
9 changed files with 137 additions and 64 deletions

View File

@ -28,6 +28,7 @@
#include <poll.h>
#include <sstream>
#include <stdexcept>
#include <iterator>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
@ -121,7 +122,7 @@ public:
Closer(T t):
m_t(t),
m_close(false)
m_close(true)
{
}
@ -185,7 +186,6 @@ Connection::Connection(const std::string& address,
m_timeout(timeout),
m_connected(false)
{
m_buf_ptr = m_buffer.begin();
}
Connection::~Connection()
@ -276,7 +276,7 @@ bool Connection::connect(const std::string& table, const std::string& gtid)
m_error = "Failed to write request: ";
m_error += strerror_r(errno, err, sizeof(err));
}
else if ((m_first_row = read()))
else if (read_schema())
{
rval = true;
}
@ -391,18 +391,13 @@ SRow Connection::process_row(json_t* js)
return rval;
}
SRow Connection::read()
bool Connection::read_schema()
{
m_error.clear();
SRow rval;
bool rval = false;
std::string row;
if (m_first_row)
{
rval.swap(m_first_row);
assert(!m_first_row);
}
else if (read_row(row))
if (read_row(row))
{
json_error_t err;
json_t* js = json_loads(row.c_str(), JSON_ALLOW_NUL, &err);
@ -413,11 +408,7 @@ SRow Connection::read()
{
m_schema = row;
process_schema(js);
rval = Connection::read();
}
else
{
rval = process_row(js);
rval = true;
}
json_decref(js);
@ -429,6 +420,40 @@ SRow Connection::read()
}
}
if (m_error == CDC::TIMEOUT)
{
assert(rval == false);
m_error += ". Data received so far: '";
std::copy(m_buffer.begin(), m_buffer.end(), std::back_inserter(m_error));
m_error += "'";
}
return rval;
}
SRow Connection::read()
{
m_error.clear();
SRow rval;
std::string row;
if (read_row(row))
{
json_error_t err;
json_t* js = json_loads(row.c_str(), JSON_ALLOW_NUL, &err);
if (js)
{
rval = process_row(js);
json_decref(js);
}
else
{
m_error = "Failed to parse JSON: ";
m_error += err.text;
}
}
return rval;
}
@ -517,14 +542,14 @@ bool Connection::do_registration()
return rval;
}
bool Connection::is_error(const char* str)
bool Connection::is_error()
{
bool rval = false;
if (str[0] == 'E' && str[1] == 'R' && str[2] == 'R')
if (m_buffer.size() >= 3 && m_buffer[0] == 'E' && m_buffer[1] == 'R' && m_buffer[2] == 'R')
{
m_error = "MaxScale responded with an error: ";
m_error += str;
m_error.append(m_buffer.begin(), m_buffer.end());
rval = true;
}
@ -539,11 +564,19 @@ bool Connection::read_row(std::string& dest)
{
if (!m_buffer.empty())
{
std::vector<char>::iterator it = std::find(m_buf_ptr, m_buffer.end(), '\n');
if (is_error())
{
rval = false;
break;
}
std::deque<char>::iterator it = std::find(m_buffer.begin(), m_buffer.end(), '\n');
if (it != m_buffer.end())
{
dest.assign(m_buf_ptr, it);
m_buf_ptr = it + 1;
dest.assign(m_buffer.begin(), it);
m_buffer.erase(m_buffer.begin(), std::next(it));
assert(m_buffer.empty() || m_buffer[0] != '\n');
break;
}
}
@ -566,27 +599,14 @@ bool Connection::read_row(std::string& dest)
break;
}
if (!m_connected)
{
// This is here to work around a missing newline in MaxScale error messages
buf[rc] = '\0';
if (is_error(buf))
{
rval = false;
break;
}
}
m_buffer.erase(m_buffer.begin(), m_buf_ptr);
assert(std::find(m_buffer.begin(), m_buffer.end(), '\n') == m_buffer.end());
m_buffer.insert(m_buffer.end(), buf, buf + rc);
m_buf_ptr = m_buffer.begin();
}
std::copy(buf, buf + rc, std::back_inserter(m_buffer));
if (!m_connected && is_error(dest.c_str()))
{
rval = false;
if (is_error())
{
rval = false;
break;
}
}
return rval;

View File

@ -21,6 +21,7 @@
#include <string>
#include <tr1/memory>
#include <vector>
#include <deque>
#include <map>
#include <algorithm>
#include <jansson.h>
@ -137,17 +138,17 @@ private:
SValueVector m_keys;
SValueVector m_types;
int m_timeout;
std::vector<char> m_buffer;
std::vector<char>::iterator m_buf_ptr;
std::deque<char> m_buffer;
SRow m_first_row;
bool m_connected;
bool do_auth();
bool do_registration();
bool read_row(std::string& dest);
bool read_schema();
void process_schema(json_t* json);
SRow process_row(json_t*);
bool is_error(const char* str);
bool is_error();
// Lower-level functions
int wait_for_event(short events);