MXS-2415: Fix client callbacks in arvorouter

The DCB callbacks shouldn't be used to send more events as they cause the
callback to be called recursively. The recursive calls caused rows to be
sent before the schemas for the rows were sent. Queuing the events via the
worker mechanism prevents this.
This commit is contained in:
Markus Mäkelä 2019-04-17 12:31:13 +03:00
parent 61f728c05a
commit 3e04922565
No known key found for this signature in database
GPG Key ID: 72D48FCE664F7B19
2 changed files with 11 additions and 25 deletions

View File

@ -34,6 +34,7 @@
#include <maxscale/alloc.h>
#include <maxscale/buffer.hh>
#include <maxscale/utils.hh>
#include <maxscale/routingworker.hh>
std::pair<std::string, std::string> get_avrofile_and_gtid(std::string file);
@ -239,22 +240,14 @@ bool file_in_dir(const char* dir, const char* file)
}
/**
* @brief The client callback for sending data
*
* @param dcb Client DCB
* @param reason Why the callback was called
* @param userdata Data provided when the callback was added
* @return Always 0
* Queue the client callback for execution
*/
int avro_client_callback(DCB* dcb, DCB_REASON reason, void* userdata)
void AvroSession::queue_client_callback()
{
if (reason == DCB_REASON_DRAINED)
{
AvroSession* client = static_cast<AvroSession*>(userdata);
client->client_callback();
}
return 0;
auto worker = mxs::RoutingWorker::get(mxs::RoutingWorker::MAIN);
worker->execute([this]() {
client_callback();
}, mxs::RoutingWorker::EXECUTE_QUEUED);
}
/**
@ -338,11 +331,7 @@ void AvroSession::process_command(GWBUF* queue)
if (file_in_dir(router->avrodir.c_str(), avro_binfile.c_str()))
{
/* set callback routine for data sending */
dcb_add_callback(dcb, DCB_REASON_DRAINED, avro_client_callback, this);
/* Add fake event that will call the avro_client_callback() routine */
poll_fake_write_event(dcb);
queue_client_callback();
}
else
{
@ -734,7 +723,7 @@ void AvroSession::client_callback()
if (next_file || read_more)
{
poll_fake_write_event(dcb);
queue_client_callback();
}
}

View File

@ -170,11 +170,6 @@ public:
*/
int routeQuery(GWBUF* buffer);
/**
* Handler for the EPOLLOUT event
*/
void client_callback();
private:
AvroSession(Avro* instance, MXS_SESSION* session);
@ -187,6 +182,8 @@ private:
bool seek_to_gtid();
bool stream_data();
void rotate_avro_file(std::string fullname);
void client_callback();
void queue_client_callback();
};
void read_table_info(uint8_t* ptr,