diff --git a/include/maxscale/dcb.hh b/include/maxscale/dcb.hh index 71d2fd18f..475fafd75 100644 --- a/include/maxscale/dcb.hh +++ b/include/maxscale/dcb.hh @@ -172,63 +172,65 @@ typedef enum * Note that the first few fields (up to and including "entry_is_ready") must * precisely match the LIST_ENTRY structure defined in the list manager. */ -struct DCB +struct DCB : public MXB_POLL_DATA { - MXB_POLL_DATA poll; - bool dcb_errhandle_called; /*< this can be called only once */ - dcb_role_t dcb_role; - int fd; /**< The descriptor */ - dcb_state_t state; /**< Current descriptor state */ - SSL_STATE ssl_state; /**< Current state of SSL if in use */ - int flags; /**< DCB flags */ - char* remote; /**< Address of remote end */ - char* user; /**< User name for connection */ - struct sockaddr_storage ip; /**< remote IPv4/IPv6 address */ - char* protoname; /**< Name of the protocol */ - void* protocol; /**< The protocol specific state */ - size_t protocol_packet_length; /**< How long the protocol specific packet is */ - size_t protocol_bytes_processed; /**< How many bytes of a packet have been read */ - struct session* session; /**< The owning session */ - Listener* listener; /**< For a client DCB, the listener data */ - MXS_PROTOCOL func; /**< The protocol functions for this descriptor */ - MXS_AUTHENTICATOR authfunc; /**< The authenticator functions for this descriptor - * */ - uint64_t writeqlen; /**< Current number of byes in the write queue */ - uint64_t high_water; /**< High water mark of write queue */ - uint64_t low_water; /**< Low water mark of write queue */ - GWBUF* writeq; /**< Write Data Queue */ - GWBUF* delayq; /**< Delay Backend Write Data Queue */ - GWBUF* readq; /**< Read queue for storing incomplete reads */ - GWBUF* fakeq; /**< Fake event queue for generated events */ - uint32_t fake_event; /**< Fake event to be delivered to handler */ + DCB(dcb_role_t role, Listener* listener, SERVICE* service); + ~DCB(); - DCBSTATS stats; /**< DCB related statistics */ - DCB* nextpersistent; /**< Next DCB in the persistent pool for SERVER */ - time_t persistentstart; /**< 0: Not in the persistent pool. - * -1: Evicted from the persistent pool and being closed. - * non-0: Time when placed in the persistent pool. - */ - SERVICE* service; /**< The related service */ - void* data; /**< Specific client data, shared between DCBs of this session */ - void* authenticator_data; /**< The authenticator data for this DCB */ - DCB_CALLBACK* callbacks; /**< The list of callbacks for the DCB */ - int64_t last_read; /*< Last time the DCB received data */ - struct server* server; /**< The associated backend server */ - SSL* ssl; /*< SSL struct for connection */ - bool ssl_read_want_read; /*< Flag */ - bool ssl_read_want_write; /*< Flag */ - bool ssl_write_want_read; /*< Flag */ - bool ssl_write_want_write;/*< Flag */ - bool was_persistent; /**< Whether this DCB was in the persistent pool */ - bool high_water_reached; /** High water mark reached, to determine whether need release - * throttle */ + bool dcb_errhandle_called = false; /**< this can be called only once */ + dcb_role_t dcb_role; + int fd = DCBFD_CLOSED; /**< The descriptor */ + dcb_state_t state = DCB_STATE_ALLOC; /**< Current descriptor state */ + SSL_STATE ssl_state = SSL_HANDSHAKE_UNKNOWN; /**< Current state of SSL if in use */ + int flags = 0; /**< DCB flags */ + char* remote = nullptr; /**< Address of remote end */ + char* user = nullptr; /**< User name for connection */ + struct sockaddr_storage ip; /**< remote IPv4/IPv6 address */ + char* protoname = nullptr; /**< Name of the protocol */ + void* protocol = nullptr; /**< The protocol specific state */ + size_t protocol_packet_length = 0; /**< protocol packet length */ + size_t protocol_bytes_processed = 0; /**< How many bytes have been read */ + struct session* session = nullptr; /**< The owning session */ + Listener* listener = nullptr; /**< For a client DCB, the listener data */ + MXS_PROTOCOL func = {}; /**< Protocol functions for the DCB */ + MXS_AUTHENTICATOR authfunc = {}; /**< Authenticator functions for the DCB */ + uint64_t writeqlen = 0; /**< Bytes in writeq */ + uint64_t high_water = 0; /**< High water mark of write queue */ + uint64_t low_water = 0; /**< Low water mark of write queue */ + GWBUF* writeq = nullptr; /**< Write Data Queue */ + GWBUF* delayq = nullptr; /**< Delay Backend Write Data Queue */ + GWBUF* readq = nullptr; /**< Read queue for incomplete reads */ + GWBUF* fakeq = nullptr; /**< Fake event queue for generated events */ + uint32_t fake_event = 0; /**< Fake event to be delivered to handler */ + + DCBSTATS stats = {}; /**< DCB related statistics */ + DCB* nextpersistent = nullptr; /**< Next DCB in the persistent pool for SERVER */ + time_t persistentstart = 0; /**< 0: Not in the persistent pool. + * -1: Evicted from the persistent pool and being closed. + * non-0: Time when placed in the persistent pool. + */ + SERVICE* service = nullptr; /**< The related service */ + void* data = nullptr; /**< Client protocol data, owned by client DCB */ + void* authenticator_data = nullptr;/**< The authenticator data for this DCB */ + DCB_CALLBACK* callbacks = nullptr; /**< The list of callbacks for the DCB */ + int64_t last_read = 0; /**< Last time the DCB received data */ + struct server* server = nullptr; /**< The associated backend server */ + SSL* ssl = nullptr; /**< SSL struct for connection */ + bool ssl_read_want_read = false; + bool ssl_read_want_write = false; + bool ssl_write_want_read = false; + bool ssl_write_want_write = false; + bool was_persistent = false; /**< Whether this DCB was in the persistent pool */ + bool high_water_reached = false; /** High water mark reached, to determine whether we need to + * release + * throttle */ struct { - DCB* next; /**< Next DCB in owning thread's list */ - DCB* tail; /**< Last DCB in owning thread's list */ + DCB* next = nullptr; /**< Next DCB in owning thread's list */ + DCB* tail = nullptr; /**< Last DCB in owning thread's list */ } thread; - uint32_t n_close; /** How many times dcb_close has been called. */ - char* path; /** If a Unix socket, the path it was bound to. */ + uint32_t n_close = 0; /** How many times dcb_close has been called. */ + char* path = nullptr; /** If a Unix socket, the path it was bound to. */ }; /** diff --git a/server/core/dcb.cc b/server/core/dcb.cc index c0d43fdd0..960bdb132 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -70,7 +70,6 @@ namespace static struct { - DCB dcb_initialized; /** A DCB with null values, used for initialization. */ DCB** all_dcbs; /** #workers sized array of pointers to DCBs where dcbs are listed. */ bool check_timeouts; /** Should session timeouts be checked. */ } this_unit; @@ -123,14 +122,6 @@ static int downstream_throttle_callback(DCB* dcb, DCB_REASON reason, void* void dcb_global_init() { - this_unit.dcb_initialized.fd = DCBFD_CLOSED; - this_unit.dcb_initialized.state = DCB_STATE_ALLOC; - this_unit.dcb_initialized.ssl_state = SSL_HANDSHAKE_UNKNOWN; - this_unit.dcb_initialized.poll.handler = dcb_poll_handler; - this_unit.dcb_initialized.high_water_reached = false; - this_unit.dcb_initialized.low_water = config_writeq_low_water(); - this_unit.dcb_initialized.high_water = config_writeq_high_water(); - int nthreads = config_threadcount(); if ((this_unit.all_dcbs = (DCB**)MXS_CALLOC(nthreads, sizeof(DCB*))) == NULL) @@ -150,20 +141,71 @@ uint64_t dcb_get_session_id(DCB* dcb) return (dcb && dcb->session) ? dcb->session->ses_id : 0; } -/** - * @brief Initialize a DCB - * - * This routine puts initial values into the fields of the DCB pointed to - * by the parameter. - * - * Most fields can be initialized by the assignment of the static - * initialized DCB. The exception is the bitmask. - * - * @param *dcb Pointer to the DCB to be initialized - */ -static void dcb_initialize(DCB* dcb) +static MXB_WORKER* get_dcb_owner(dcb_role_t role) { - *dcb = this_unit.dcb_initialized; + MXB_WORKER* owner; + + if (role == DCB_ROLE_SERVICE_LISTENER) + { + /** All listeners are owned by the main thread (i.e. thread no. 0) */ + owner = RoutingWorker::get(RoutingWorker::MAIN); + } + else + { + /** Otherwise the DCB is owned by the thread that allocates it */ + mxb_assert(RoutingWorker::get_current_id() != -1); + owner = RoutingWorker::get_current(); + } + + return owner; +} + +DCB::DCB(dcb_role_t role, Listener* listener, SERVICE* service) + : MXB_POLL_DATA{dcb_poll_handler, get_dcb_owner(role)} + , dcb_role(role) + , listener(listener) + , high_water(config_writeq_high_water()) + , low_water(config_writeq_low_water()) + , service(service) + , last_read(mxs_clock()) +{ +} + +DCB::~DCB() +{ + if (data && authfunc.free) + { + authfunc.free(this); + } + + if (authfunc.destroy) + { + authfunc.destroy(authenticator_data); + } + + while (callbacks) + { + DCB_CALLBACK* tmp = callbacks; + callbacks = callbacks->next; + MXS_FREE(tmp); + } + + if (ssl) + { + SSL_free(ssl); + } + + MXS_FREE(protoname); + MXS_FREE(remote); + MXS_FREE(user); + MXS_FREE(path); + MXS_FREE(protocol); + gwbuf_free(delayq); + gwbuf_free(writeq); + gwbuf_free(readq); + gwbuf_free(fakeq); + + owner = reinterpret_cast(0xdeadbeef); } /** @@ -182,34 +224,7 @@ static void dcb_initialize(DCB* dcb) */ DCB* dcb_alloc(dcb_role_t role, Listener* listener, SERVICE* service) { - DCB* newdcb; - - if ((newdcb = (DCB*)MXS_MALLOC(sizeof(*newdcb))) == NULL) - { - return NULL; - } - - dcb_initialize(newdcb); - newdcb->dcb_role = role; - newdcb->listener = listener; - newdcb->service = service; - newdcb->last_read = mxs_clock(); - newdcb->low_water = config_writeq_low_water(); - newdcb->high_water = config_writeq_high_water(); - - if (role == DCB_ROLE_SERVICE_LISTENER) - { - /** All listeners are owned by the main thread (i.e. thread no. 0) */ - newdcb->poll.owner = RoutingWorker::get(RoutingWorker::MAIN); - } - else - { - /** Otherwise the DCB is owned by the thread that allocates it */ - mxb_assert(RoutingWorker::get_current_id() != -1); - newdcb->poll.owner = RoutingWorker::get_current(); - } - - return newdcb; + return new(std::nothrow) DCB(role, listener, service); } /** @@ -273,76 +288,7 @@ void dcb_free_all_memory(DCB* dcb) this_thread.current_dcb = NULL; } - DCB_CALLBACK* cb_dcb; - - if (dcb->protocol) - { - MXS_FREE(dcb->protocol); - } - if (dcb->data && dcb->authfunc.free) - { - dcb->authfunc.free(dcb); - dcb->data = NULL; - } - if (dcb->authfunc.destroy) - { - dcb->authfunc.destroy(dcb->authenticator_data); - dcb->authenticator_data = NULL; - } - if (dcb->protoname) - { - MXS_FREE(dcb->protoname); - } - if (dcb->remote) - { - MXS_FREE(dcb->remote); - } - if (dcb->user) - { - MXS_FREE(dcb->user); - } - - /* Clear write and read buffers */ - if (dcb->delayq) - { - gwbuf_free(dcb->delayq); - dcb->delayq = NULL; - } - if (dcb->writeq) - { - gwbuf_free(dcb->writeq); - dcb->writeq = NULL; - } - if (dcb->readq) - { - gwbuf_free(dcb->readq); - dcb->readq = NULL; - } - if (dcb->fakeq) - { - gwbuf_free(dcb->fakeq); - dcb->fakeq = NULL; - } - - while ((cb_dcb = dcb->callbacks) != NULL) - { - dcb->callbacks = cb_dcb->next; - MXS_FREE(cb_dcb); - } - - if (dcb->ssl) - { - SSL_free(dcb->ssl); - } - - if (dcb->path) - { - MXS_FREE(dcb->path); - } - - // Ensure that id is immediately the wrong one. - dcb->poll.owner = reinterpret_cast(0xdeadbeef); - MXS_FREE(dcb); + delete dcb; } /** @@ -390,7 +336,7 @@ DCB* dcb_connect(SERVER* server, MXS_SESSION* session, const char* protocol) user, session->client_dcb->remote, protocol, - static_cast(session->client_dcb->poll.owner)->id()); + static_cast(session->client_dcb->owner)->id()); if (dcb) { /** @@ -1102,7 +1048,7 @@ void dcb_close(DCB* dcb) { #if defined (SS_DEBUG) RoutingWorker* current = RoutingWorker::get_current(); - RoutingWorker* owner = static_cast(dcb->poll.owner); + RoutingWorker* owner = static_cast(dcb->owner); if (current && (current != owner)) { MXS_ALERT("dcb_close(%p) called by %d, owned by %d.", @@ -1152,7 +1098,7 @@ void dcb_close(DCB* dcb) } else { - RoutingWorker* worker = static_cast(dcb->poll.owner); + RoutingWorker* worker = static_cast(dcb->owner); mxb_assert(worker); worker->register_zombie(dcb); @@ -1183,7 +1129,7 @@ void dcb_close_in_owning_thread(DCB* dcb) // TODO: reference counted, so that we could addref before posting, thus // TODO: preventing too early a deletion. - MXB_WORKER* worker = static_cast(dcb->poll.owner); // The owning worker + MXB_WORKER* worker = static_cast(dcb->owner); // The owning worker mxb_assert(worker); intptr_t arg1 = (intptr_t)cb_dcb_close_in_owning_thread; @@ -1199,7 +1145,7 @@ void dcb_final_close(DCB* dcb) { #if defined (SS_DEBUG) RoutingWorker* current = RoutingWorker::get_current(); - RoutingWorker* owner = static_cast(dcb->poll.owner); + RoutingWorker* owner = static_cast(dcb->owner); if (current && (current != owner)) { MXS_ALERT("dcb_final_close(%p) called by %d, owned by %d.", @@ -1296,7 +1242,7 @@ void dcb_final_close(DCB* dcb) */ static bool dcb_maybe_add_persistent(DCB* dcb) { - RoutingWorker* owner = static_cast(dcb->poll.owner); + RoutingWorker* owner = static_cast(dcb->owner); if (dcb->user != NULL && (dcb->func.established == NULL || dcb->func.established(dcb)) && strlen(dcb->user) @@ -2776,7 +2722,7 @@ static void dcb_add_to_list_cb(int thread_id, void* data) { DCB* dcb = (DCB*)data; - mxb_assert(thread_id == static_cast(dcb->poll.owner)->id()); + mxb_assert(thread_id == static_cast(dcb->owner)->id()); dcb_add_to_list(dcb); } @@ -2791,7 +2737,7 @@ static void dcb_add_to_list(DCB* dcb) * is not in the list. Stopped listeners are not removed from the list. */ - int id = static_cast(dcb->poll.owner)->id(); + int id = static_cast(dcb->owner)->id(); mxb_assert(id == RoutingWorker::get_current_id()); if (this_unit.all_dcbs[id] == NULL) @@ -2814,7 +2760,7 @@ static void dcb_add_to_list(DCB* dcb) */ static void dcb_remove_from_list(DCB* dcb) { - int id = static_cast(dcb->poll.owner)->id(); + int id = static_cast(dcb->owner)->id(); if (dcb == this_unit.all_dcbs[id]) { @@ -2995,7 +2941,7 @@ int dcb_get_port(const DCB* dcb) static uint32_t dcb_process_poll_events(DCB* dcb, uint32_t events) { - RoutingWorker* owner = static_cast(dcb->poll.owner); + RoutingWorker* owner = static_cast(dcb->owner); mxb_assert(owner == RoutingWorker::get_current() || dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER); @@ -3307,7 +3253,7 @@ static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, uint32_t ev) if (task) { - RoutingWorker* worker = static_cast(dcb->poll.owner); + RoutingWorker* worker = static_cast(dcb->owner); worker->execute(std::unique_ptr(task), Worker::EXECUTE_QUEUED); } else @@ -3407,7 +3353,7 @@ public: { RoutingWorker& rworker = static_cast(worker); - mxb_assert(rworker.id() == static_cast(m_dcb->poll.owner)->id()); + mxb_assert(rworker.id() == static_cast(m_dcb->owner)->id()); bool added = dcb_add_to_worker(&rworker, m_dcb, m_events); mxb_assert(added); @@ -3469,7 +3415,7 @@ static bool dcb_add_to_worker(Worker* worker, DCB* dcb, uint32_t events) { // If this takes place on the main thread (all listening DCBs are // stored on the main thread)... - if (dcb->poll.owner == RoutingWorker::get_current()) + if (dcb->owner == RoutingWorker::get_current()) { // ..we'll add it immediately to the list, dcb_add_to_list(dcb); @@ -3478,7 +3424,7 @@ static bool dcb_add_to_worker(Worker* worker, DCB* dcb, uint32_t events) { // otherwise we must move the adding to the main thread. // TODO: Separate listening and other DCBs, as this is a mess. - RoutingWorker* worker = static_cast(dcb->poll.owner); + RoutingWorker* worker = static_cast(dcb->owner); mxb_assert(worker); intptr_t arg1 = (intptr_t)dcb_add_to_list_cb; @@ -3495,7 +3441,7 @@ static bool dcb_add_to_worker(Worker* worker, DCB* dcb, uint32_t events) } else { - mxb_assert(worker == dcb->poll.owner); + mxb_assert(worker == dcb->owner); if (worker == RoutingWorker::get_current()) { @@ -3517,7 +3463,7 @@ static bool dcb_add_to_worker(Worker* worker, DCB* dcb, uint32_t events) if (task) { - Worker* worker = static_cast(dcb->poll.owner); + Worker* worker = static_cast(dcb->owner); mxb_assert(worker); if (worker->execute(std::unique_ptr(task), Worker::EXECUTE_QUEUED)) @@ -3580,7 +3526,7 @@ int poll_add_dcb(DCB* dcb) else if (dcb->state == DCB_STATE_NOPOLLING) { // This DCB was removed and added back to epoll. Assign it to the same worker it started with. - owner = static_cast(dcb->poll.owner); + owner = static_cast(dcb->owner); } else { @@ -3589,16 +3535,16 @@ int poll_add_dcb(DCB* dcb) } new_state = DCB_STATE_POLLING; - dcb->poll.owner = owner; + dcb->owner = owner; } else { mxb_assert(dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER); mxb_assert(RoutingWorker::get_current_id() != -1); - mxb_assert(RoutingWorker::get_current() == dcb->poll.owner); + mxb_assert(RoutingWorker::get_current() == dcb->owner); new_state = DCB_STATE_POLLING; - owner = static_cast(dcb->poll.owner); + owner = static_cast(dcb->owner); } /** @@ -3619,7 +3565,7 @@ int poll_add_dcb(DCB* dcb) * debug asserts will be triggered. */ dcb->state = old_state; - dcb->poll.owner = RoutingWorker::get_current(); + dcb->owner = RoutingWorker::get_current(); rc = -1; } @@ -3669,7 +3615,7 @@ int poll_remove_dcb(DCB* dcb) } else { - Worker* worker = static_cast(dcb->poll.owner); + Worker* worker = static_cast(dcb->owner); mxb_assert(worker); if (worker->remove_fd(dcbfd)) diff --git a/server/core/routingworker.cc b/server/core/routingworker.cc index edef95151..783baacdb 100644 --- a/server/core/routingworker.cc +++ b/server/core/routingworker.cc @@ -536,7 +536,7 @@ RoutingWorker::SessionsById& RoutingWorker::session_registry() void RoutingWorker::register_zombie(DCB* pDcb) { - mxb_assert(pDcb->poll.owner == this); + mxb_assert(pDcb->owner == this); m_zombies.push_back(pDcb); } diff --git a/server/core/session.cc b/server/core/session.cc index f9a20b2b0..39fc49bd3 100644 --- a/server/core/session.cc +++ b/server/core/session.cc @@ -273,7 +273,7 @@ void session_link_backend_dcb(MXS_SESSION* session, DCB* dcb) dcb->session = session; dcb->service = session->service; /** Move this DCB under the same thread */ - dcb->poll.owner = session->client_dcb->poll.owner; + dcb->owner = session->client_dcb->owner; Session* ses = static_cast(session); ses->link_backend_dcb(dcb); @@ -1144,7 +1144,7 @@ bool session_delay_routing(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF* buf try { Worker* worker = Worker::get_current(); - mxb_assert(worker == session->client_dcb->poll.owner); + mxb_assert(worker == session->client_dcb->owner); std::unique_ptr task(new DelayedRoutingTask(session, down, buffer)); // Delay the routing for at least a millisecond diff --git a/server/core/test/test_modulecmd.cc b/server/core/test/test_modulecmd.cc index 16d2246f4..381eba82b 100644 --- a/server/core/test/test_modulecmd.cc +++ b/server/core/test/test_modulecmd.cc @@ -312,13 +312,13 @@ int test_map() return 0; } -static DCB my_dcb; +static DCB* my_dcb = (DCB*)0xdeadbeef; bool ptrfn(const MODULECMD_ARG* argv, json_t** output) { bool rval = false; - if (argv->argc == 1 && argv->argv[0].value.dcb == &my_dcb) + if (argv->argc == 1 && argv->argv[0].value.dcb == my_dcb) { rval = true; } @@ -343,7 +343,7 @@ int test_pointers() const MODULECMD* cmd = modulecmd_find_command(ns, id); TEST(cmd, "The registered command should be found"); - const void* params[] = {&my_dcb}; + const void* params[] = {my_dcb}; MODULECMD_ARG* arg = modulecmd_arg_parse(cmd, 1, params); TEST(arg, "Parsing arguments should succeed"); diff --git a/server/modules/filter/test/mock_backend.cc b/server/modules/filter/test/mock_backend.cc index 6c3162193..e266aacbd 100644 --- a/server/modules/filter/test/mock_backend.cc +++ b/server/modules/filter/test/mock_backend.cc @@ -174,9 +174,9 @@ class ResultSetDCB : public DCB { public: ResultSetDCB() + : DCB(DCB_ROLE_CLIENT_HANDLER, nullptr, nullptr) { DCB* pDcb = this; - memset(pDcb, 0, sizeof(*pDcb)); pDcb->func.write = &ResultSetDCB::write; } diff --git a/server/modules/filter/test/mock_dcb.cc b/server/modules/filter/test/mock_dcb.cc index 9f19c1d66..85d2d4ff5 100644 --- a/server/modules/filter/test/mock_dcb.cc +++ b/server/modules/filter/test/mock_dcb.cc @@ -18,8 +18,6 @@ namespace void initialize_dcb(DCB* pDcb) { - memset(pDcb, 0, sizeof(DCB)); - pDcb->fd = DCBFD_CLOSED; pDcb->state = DCB_STATE_ALLOC; pDcb->ssl_state = SSL_HANDSHAKE_UNKNOWN; @@ -36,7 +34,8 @@ Dcb::Dcb(MXS_SESSION* pSession, const char* zUser, const char* zHost, Handler* pHandler) - : m_user(zUser) + : DCB(DCB_ROLE_CLIENT_HANDLER, nullptr, nullptr) + , m_user(zUser) , m_host(zHost) , m_pHandler(pHandler) { diff --git a/server/modules/protocol/MySQL/mariadbclient/mysql_client.cc b/server/modules/protocol/MySQL/mariadbclient/mysql_client.cc index b82afcd11..23ffe3261 100644 --- a/server/modules/protocol/MySQL/mariadbclient/mysql_client.cc +++ b/server/modules/protocol/MySQL/mariadbclient/mysql_client.cc @@ -1434,7 +1434,7 @@ static void gw_process_one_new_client(DCB* client_dcb) else { // Move the rest of the initialization process to the owning worker - mxs::RoutingWorker* worker = static_cast(client_dcb->poll.owner); + mxs::RoutingWorker* worker = static_cast(client_dcb->owner); worker->execute([=]() { client_dcb->protocol = mysql_protocol_init(client_dcb, client_dcb->fd); diff --git a/server/modules/routing/binlogrouter/blr_master.cc b/server/modules/routing/binlogrouter/blr_master.cc index 8a15ce4eb..22cf9c42a 100644 --- a/server/modules/routing/binlogrouter/blr_master.cc +++ b/server/modules/routing/binlogrouter/blr_master.cc @@ -215,7 +215,7 @@ static void blr_start_master(void* data) * 'client' is the fake DCB that emulates a client session: * we need to set the poll.thread.id for the "dummy client" */ - client->session->client_dcb->poll.owner = mxs_rworker_get_current(); + client->session->client_dcb->owner = mxs_rworker_get_current(); /* Connect to configured master server */ if ((router->master = dcb_connect(router->service->dbref->server,