diff --git a/connectors/cdc-connector/cdc_connector.cpp b/connectors/cdc-connector/cdc_connector.cpp index 802719277..f5913bf00 100644 --- a/connectors/cdc-connector/cdc_connector.cpp +++ b/connectors/cdc-connector/cdc_connector.cpp @@ -276,16 +276,10 @@ bool Connection::connect(const std::string& table, const std::string& gtid) m_error = "Failed to write request: "; m_error += strerror_r(errno, err, sizeof(err)); } - else if ((m_first_row = read())) + else if (read_schema()) { rval = true; } - else if (m_error == CDC::TIMEOUT) - { - m_error += ". Data received so far: '"; - std::copy(m_buffer.begin(), m_buffer.end(), std::back_inserter(m_error)); - m_error += "'"; - } } } } @@ -397,18 +391,13 @@ SRow Connection::process_row(json_t* js) return rval; } -SRow Connection::read() +bool Connection::read_schema() { m_error.clear(); - SRow rval; + bool rval = false; std::string row; - if (m_first_row) - { - rval.swap(m_first_row); - assert(!m_first_row); - } - else if (read_row(row)) + if (read_row(row)) { json_error_t err; json_t* js = json_loads(row.c_str(), JSON_ALLOW_NUL, &err); @@ -419,11 +408,7 @@ SRow Connection::read() { m_schema = row; process_schema(js); - rval = Connection::read(); - } - else - { - rval = process_row(js); + rval = true; } json_decref(js); @@ -435,6 +420,40 @@ SRow Connection::read() } } + if (m_error == CDC::TIMEOUT) + { + assert(rval == false); + m_error += ". Data received so far: '"; + std::copy(m_buffer.begin(), m_buffer.end(), std::back_inserter(m_error)); + m_error += "'"; + } + + return rval; +} + +SRow Connection::read() +{ + m_error.clear(); + SRow rval; + std::string row; + + if (read_row(row)) + { + json_error_t err; + json_t* js = json_loads(row.c_str(), JSON_ALLOW_NUL, &err); + + if (js) + { + rval = process_row(js); + json_decref(js); + } + else + { + m_error = "Failed to parse JSON: "; + m_error += err.text; + } + } + return rval; } diff --git a/connectors/cdc-connector/cdc_connector.h b/connectors/cdc-connector/cdc_connector.h index c5787d165..fa0b8e40f 100644 --- a/connectors/cdc-connector/cdc_connector.h +++ b/connectors/cdc-connector/cdc_connector.h @@ -145,6 +145,7 @@ private: bool do_auth(); bool do_registration(); bool read_row(std::string& dest); + bool read_schema(); void process_schema(json_t* json); SRow process_row(json_t*); bool is_error();