diff --git a/connectors/cdc-connector/cdc_connector.cpp b/connectors/cdc-connector/cdc_connector.cpp index 5aa2814fa..be751488a 100644 --- a/connectors/cdc-connector/cdc_connector.cpp +++ b/connectors/cdc-connector/cdc_connector.cpp @@ -254,28 +254,32 @@ bool Connection::connect(const std::string& table, const std::string& gtid) m_error = "Failed to set socket non-blocking: "; m_error += strerror_r(errno, err, sizeof(err)); } - else if (do_auth() && do_registration()) + else { - std::string req_msg(REQUEST_MSG); - req_msg += table; + m_fd = c_fd.release(); + m_connected = true; - if (gtid.length()) + if (do_auth() && do_registration()) { - req_msg += " "; - req_msg += gtid; - } + std::string req_msg(REQUEST_MSG); + req_msg += table; - if (nointr_write(req_msg.c_str(), req_msg.length()) == -1) - { - char err[ERRBUF_SIZE]; - m_error = "Failed to write request: "; - m_error += strerror_r(errno, err, sizeof(err)); - } - else if ((m_first_row = read())) - { - rval = true; - m_connected = true; - m_fd = c_fd.release(); + if (gtid.length()) + { + req_msg += " "; + req_msg += gtid; + } + + if (nointr_write(req_msg.c_str(), req_msg.length()) == -1) + { + char err[ERRBUF_SIZE]; + m_error = "Failed to write request: "; + m_error += strerror_r(errno, err, sizeof(err)); + } + else if ((m_first_row = read())) + { + rval = true; + } } } } @@ -438,11 +442,12 @@ bool Connection::do_auth() std::string auth_str = generateAuthString(m_user, m_password); /** Send the auth string */ - if (nointr_write(auth_str.c_str(), auth_str.length()) == -1) + int rc = nointr_write(auth_str.c_str(), auth_str.length()); + if (rc <= 0) { char err[ERRBUF_SIZE]; m_error = "Failed to write authentication data: "; - m_error += strerror_r(errno, err, sizeof(err)); + m_error += rc == -1 ? strerror_r(errno, err, sizeof(err)) : "Write timeout"; } else { @@ -688,11 +693,12 @@ int Connection::nointr_read(void *dest, size_t size) int Connection::nointr_write(const void *src, size_t size) { int rc = 0; - int n_bytes = 0; + size_t n_bytes = 0; + const uint8_t* ptr = static_cast(src); - if (wait_for_event(POLLOUT) > 0) + do { - while ((rc = ::write(m_fd, src, size)) < 0 && errno == EINTR) + while ((rc = ::write(m_fd, ptr, size)) < 0 && errno == EINTR) { ; } @@ -707,8 +713,11 @@ int Connection::nointr_write(const void *src, size_t size) else if (rc > 0) { n_bytes += rc; + ptr += rc; + size -= rc; } } + while (n_bytes < size && wait_for_event(POLLOUT) > 0); return n_bytes; }