diff --git a/connectors/cdc-connector/cdc_connector.cpp b/connectors/cdc-connector/cdc_connector.cpp index aa014b56e..00eb808dc 100644 --- a/connectors/cdc-connector/cdc_connector.cpp +++ b/connectors/cdc-connector/cdc_connector.cpp @@ -186,7 +186,6 @@ Connection::Connection(const std::string& address, m_timeout(timeout), m_connected(false) { - m_buf_ptr = m_buffer.begin(); } Connection::~Connection() @@ -283,8 +282,9 @@ bool Connection::connect(const std::string& table, const std::string& gtid) } else { - m_error += ". Data received so far: "; + m_error += ". Data received so far: '"; std::copy(m_buffer.begin(), m_buffer.end(), std::back_inserter(m_error)); + m_error += "'"; } } } @@ -545,11 +545,13 @@ bool Connection::read_row(std::string& dest) { if (!m_buffer.empty()) { - std::vector::iterator it = std::find(m_buf_ptr, m_buffer.end(), '\n'); + std::deque::iterator it = std::find(m_buffer.begin(), m_buffer.end(), '\n'); + if (it != m_buffer.end()) { - dest.assign(m_buf_ptr, it); - m_buf_ptr = it + 1; + dest.assign(m_buffer.begin(), it); + m_buffer.erase(m_buffer.begin(), std::next(it)); + assert(m_buffer.empty() || m_buffer[0] != '\n'); break; } } @@ -572,22 +574,14 @@ bool Connection::read_row(std::string& dest) break; } - if (!m_connected) - { - // This is here to work around a missing newline in MaxScale error messages - buf[rc] = '\0'; - - if (is_error(buf)) - { - rval = false; - break; - } - } - - m_buffer.erase(m_buffer.begin(), m_buf_ptr); assert(std::find(m_buffer.begin(), m_buffer.end(), '\n') == m_buffer.end()); - m_buffer.insert(m_buffer.end(), buf, buf + rc); - m_buf_ptr = m_buffer.begin(); + std::copy(buf, buf + rc, std::back_inserter(m_buffer)); + + if (is_error(&m_buffer[0])) + { + rval = false; + break; + } } if (is_error(dest.c_str())) diff --git a/connectors/cdc-connector/cdc_connector.h b/connectors/cdc-connector/cdc_connector.h index fa01ac28b..e91c371f1 100644 --- a/connectors/cdc-connector/cdc_connector.h +++ b/connectors/cdc-connector/cdc_connector.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -137,8 +138,7 @@ private: SValueVector m_keys; SValueVector m_types; int m_timeout; - std::vector m_buffer; - std::vector::iterator m_buf_ptr; + std::deque m_buffer; SRow m_first_row; bool m_connected;