Split schema and row processing
The recursive calls into `read` caused unnecessary slowness in the connection phase. The actual first row should only be read when the data is requested. This can possibly solve the false timeout errors caused by slow sending of the first row of data.
This commit is contained in:
@ -276,16 +276,10 @@ bool Connection::connect(const std::string& table, const std::string& gtid)
|
|||||||
m_error = "Failed to write request: ";
|
m_error = "Failed to write request: ";
|
||||||
m_error += strerror_r(errno, err, sizeof(err));
|
m_error += strerror_r(errno, err, sizeof(err));
|
||||||
}
|
}
|
||||||
else if ((m_first_row = read()))
|
else if (read_schema())
|
||||||
{
|
{
|
||||||
rval = true;
|
rval = true;
|
||||||
}
|
}
|
||||||
else if (m_error == CDC::TIMEOUT)
|
|
||||||
{
|
|
||||||
m_error += ". Data received so far: '";
|
|
||||||
std::copy(m_buffer.begin(), m_buffer.end(), std::back_inserter(m_error));
|
|
||||||
m_error += "'";
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -397,18 +391,13 @@ SRow Connection::process_row(json_t* js)
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRow Connection::read()
|
bool Connection::read_schema()
|
||||||
{
|
{
|
||||||
m_error.clear();
|
m_error.clear();
|
||||||
SRow rval;
|
bool rval = false;
|
||||||
std::string row;
|
std::string row;
|
||||||
|
|
||||||
if (m_first_row)
|
if (read_row(row))
|
||||||
{
|
|
||||||
rval.swap(m_first_row);
|
|
||||||
assert(!m_first_row);
|
|
||||||
}
|
|
||||||
else if (read_row(row))
|
|
||||||
{
|
{
|
||||||
json_error_t err;
|
json_error_t err;
|
||||||
json_t* js = json_loads(row.c_str(), JSON_ALLOW_NUL, &err);
|
json_t* js = json_loads(row.c_str(), JSON_ALLOW_NUL, &err);
|
||||||
@ -419,11 +408,7 @@ SRow Connection::read()
|
|||||||
{
|
{
|
||||||
m_schema = row;
|
m_schema = row;
|
||||||
process_schema(js);
|
process_schema(js);
|
||||||
rval = Connection::read();
|
rval = true;
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
rval = process_row(js);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
json_decref(js);
|
json_decref(js);
|
||||||
@ -435,6 +420,40 @@ SRow Connection::read()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (m_error == CDC::TIMEOUT)
|
||||||
|
{
|
||||||
|
assert(rval == false);
|
||||||
|
m_error += ". Data received so far: '";
|
||||||
|
std::copy(m_buffer.begin(), m_buffer.end(), std::back_inserter(m_error));
|
||||||
|
m_error += "'";
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRow Connection::read()
|
||||||
|
{
|
||||||
|
m_error.clear();
|
||||||
|
SRow rval;
|
||||||
|
std::string row;
|
||||||
|
|
||||||
|
if (read_row(row))
|
||||||
|
{
|
||||||
|
json_error_t err;
|
||||||
|
json_t* js = json_loads(row.c_str(), JSON_ALLOW_NUL, &err);
|
||||||
|
|
||||||
|
if (js)
|
||||||
|
{
|
||||||
|
rval = process_row(js);
|
||||||
|
json_decref(js);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
m_error = "Failed to parse JSON: ";
|
||||||
|
m_error += err.text;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,6 +145,7 @@ private:
|
|||||||
bool do_auth();
|
bool do_auth();
|
||||||
bool do_registration();
|
bool do_registration();
|
||||||
bool read_row(std::string& dest);
|
bool read_row(std::string& dest);
|
||||||
|
bool read_schema();
|
||||||
void process_schema(json_t* json);
|
void process_schema(json_t* json);
|
||||||
SRow process_row(json_t*);
|
SRow process_row(json_t*);
|
||||||
bool is_error();
|
bool is_error();
|
||||||
|
Reference in New Issue
Block a user