From 68b4f20436e7ac71708c604dad74c40dc4696f7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Mon, 27 Aug 2018 13:22:16 +0300 Subject: [PATCH] Split schema and row processing The recursive calls into `read` caused unnecessary slowness in the connection phase. The actual first row should only be read when the data is requested. This can possibly solve the false timeout errors caused by slow sending of the first row of data. --- connectors/cdc-connector/cdc_connector.cpp | 59 ++++++++++++++-------- connectors/cdc-connector/cdc_connector.h | 1 + 2 files changed, 40 insertions(+), 20 deletions(-) diff --git a/connectors/cdc-connector/cdc_connector.cpp b/connectors/cdc-connector/cdc_connector.cpp index 802719277..f5913bf00 100644 --- a/connectors/cdc-connector/cdc_connector.cpp +++ b/connectors/cdc-connector/cdc_connector.cpp @@ -276,16 +276,10 @@ bool Connection::connect(const std::string& table, const std::string& gtid) m_error = "Failed to write request: "; m_error += strerror_r(errno, err, sizeof(err)); } - else if ((m_first_row = read())) + else if (read_schema()) { rval = true; } - else if (m_error == CDC::TIMEOUT) - { - m_error += ". Data received so far: '"; - std::copy(m_buffer.begin(), m_buffer.end(), std::back_inserter(m_error)); - m_error += "'"; - } } } } @@ -397,18 +391,13 @@ SRow Connection::process_row(json_t* js) return rval; } -SRow Connection::read() +bool Connection::read_schema() { m_error.clear(); - SRow rval; + bool rval = false; std::string row; - if (m_first_row) - { - rval.swap(m_first_row); - assert(!m_first_row); - } - else if (read_row(row)) + if (read_row(row)) { json_error_t err; json_t* js = json_loads(row.c_str(), JSON_ALLOW_NUL, &err); @@ -419,11 +408,7 @@ SRow Connection::read() { m_schema = row; process_schema(js); - rval = Connection::read(); - } - else - { - rval = process_row(js); + rval = true; } json_decref(js); @@ -435,6 +420,40 @@ SRow Connection::read() } } + if (m_error == CDC::TIMEOUT) + { + assert(rval == false); + m_error += ". Data received so far: '"; + std::copy(m_buffer.begin(), m_buffer.end(), std::back_inserter(m_error)); + m_error += "'"; + } + + return rval; +} + +SRow Connection::read() +{ + m_error.clear(); + SRow rval; + std::string row; + + if (read_row(row)) + { + json_error_t err; + json_t* js = json_loads(row.c_str(), JSON_ALLOW_NUL, &err); + + if (js) + { + rval = process_row(js); + json_decref(js); + } + else + { + m_error = "Failed to parse JSON: "; + m_error += err.text; + } + } + return rval; } diff --git a/connectors/cdc-connector/cdc_connector.h b/connectors/cdc-connector/cdc_connector.h index c5787d165..fa0b8e40f 100644 --- a/connectors/cdc-connector/cdc_connector.h +++ b/connectors/cdc-connector/cdc_connector.h @@ -145,6 +145,7 @@ private: bool do_auth(); bool do_registration(); bool read_row(std::string& dest); + bool read_schema(); void process_schema(json_t* json); SRow process_row(json_t*); bool is_error();