From 3e0492256519606afb41ca66dbb660ef07c963e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Wed, 17 Apr 2019 12:31:13 +0300 Subject: [PATCH] MXS-2415: Fix client callbacks in arvorouter The DCB callbacks shouldn't be used to send more events as they cause the callback to be called recursively. The recursive calls caused rows to be sent before the schemas for the rows were sent. Queuing the events via the worker mechanism prevents this. --- .../modules/routing/avrorouter/avro_client.cc | 29 ++++++------------- .../modules/routing/avrorouter/avrorouter.hh | 7 ++--- 2 files changed, 11 insertions(+), 25 deletions(-) diff --git a/server/modules/routing/avrorouter/avro_client.cc b/server/modules/routing/avrorouter/avro_client.cc index e80d171d0..566b7b571 100644 --- a/server/modules/routing/avrorouter/avro_client.cc +++ b/server/modules/routing/avrorouter/avro_client.cc @@ -34,6 +34,7 @@ #include #include #include +#include std::pair get_avrofile_and_gtid(std::string file); @@ -239,22 +240,14 @@ bool file_in_dir(const char* dir, const char* file) } /** - * @brief The client callback for sending data - * - * @param dcb Client DCB - * @param reason Why the callback was called - * @param userdata Data provided when the callback was added - * @return Always 0 + * Queue the client callback for execution */ -int avro_client_callback(DCB* dcb, DCB_REASON reason, void* userdata) +void AvroSession::queue_client_callback() { - if (reason == DCB_REASON_DRAINED) - { - AvroSession* client = static_cast(userdata); - client->client_callback(); - } - - return 0; + auto worker = mxs::RoutingWorker::get(mxs::RoutingWorker::MAIN); + worker->execute([this]() { + client_callback(); + }, mxs::RoutingWorker::EXECUTE_QUEUED); } /** @@ -338,11 +331,7 @@ void AvroSession::process_command(GWBUF* queue) if (file_in_dir(router->avrodir.c_str(), avro_binfile.c_str())) { - /* set callback routine for data sending */ - dcb_add_callback(dcb, DCB_REASON_DRAINED, avro_client_callback, this); - - /* Add fake event that will call the avro_client_callback() routine */ - poll_fake_write_event(dcb); + queue_client_callback(); } else { @@ -734,7 +723,7 @@ void AvroSession::client_callback() if (next_file || read_more) { - poll_fake_write_event(dcb); + queue_client_callback(); } } diff --git a/server/modules/routing/avrorouter/avrorouter.hh b/server/modules/routing/avrorouter/avrorouter.hh index 7f22af073..d51f78e0d 100644 --- a/server/modules/routing/avrorouter/avrorouter.hh +++ b/server/modules/routing/avrorouter/avrorouter.hh @@ -170,11 +170,6 @@ public: */ int routeQuery(GWBUF* buffer); - /** - * Handler for the EPOLLOUT event - */ - void client_callback(); - private: AvroSession(Avro* instance, MXS_SESSION* session); @@ -187,6 +182,8 @@ private: bool seek_to_gtid(); bool stream_data(); void rotate_avro_file(std::string fullname); + void client_callback(); + void queue_client_callback(); }; void read_table_info(uint8_t* ptr,