MXS-2196: Remove listener DCB type

Removed all code and references to the listener type DCBs. Also removed
the DCB type macros and replaced them with a member function.
This commit is contained in:
Markus Mäkelä
2018-12-03 08:03:06 +02:00
parent 7aa60b4a24
commit 13769a0e8c
2 changed files with 61 additions and 160 deletions

View File

@ -95,35 +95,11 @@ typedef enum
typedef enum typedef enum
{ {
DCB_ROLE_SERVICE_LISTENER, /*< Receives initial connect requests from clients */
DCB_ROLE_CLIENT_HANDLER, /*< Serves dedicated client */ DCB_ROLE_CLIENT_HANDLER, /*< Serves dedicated client */
DCB_ROLE_BACKEND_HANDLER, /*< Serves back end connection */ DCB_ROLE_BACKEND_HANDLER, /*< Serves back end connection */
DCB_ROLE_INTERNAL /*< Internal DCB not connected to the outside */ DCB_ROLE_INTERNAL /*< Internal DCB not connected to the outside */
} dcb_role_t; } dcb_role_t;
#define STRDCBROLE(r) \
((r) == DCB_ROLE_SERVICE_LISTENER ? "DCB_ROLE_SERVICE_LISTENER" \
: ((r) == DCB_ROLE_CLIENT_HANDLER ? "DCB_ROLE_CLIENT_HANDLER" \
: ((r) \
== DCB_ROLE_BACKEND_HANDLER \
? "DCB_ROLE_BACKEND_HANDLER" \
: (( \
r) \
== \
DCB_ROLE_INTERNAL \
? \
"DCB_ROLE_INTERNAL" \
: \
"UNKNOWN DCB ROLE"))))
#define DCB_STRTYPE(dcb) \
(dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER ? "Client DCB" \
: dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER ? "Backend DCB" \
: dcb->dcb_role \
== DCB_ROLE_SERVICE_LISTENER ? "Listener DCB" \
: \
dcb->dcb_role == DCB_ROLE_INTERNAL ? "Internal DCB" : "Unknown DCB")
/** /**
* Callback reasons for the DCB callback mechanism. * Callback reasons for the DCB callback mechanism.
*/ */
@ -180,6 +156,11 @@ struct DCB : public MXB_POLL_DATA
DCB(dcb_role_t role, const SListener& listener, SERVICE* service); DCB(dcb_role_t role, const SListener& listener, SERVICE* service);
~DCB(); ~DCB();
/**
* DCB type in string form
*/
const char* type();
bool dcb_errhandle_called = false; /**< this can be called only once */ bool dcb_errhandle_called = false; /**< this can be called only once */
dcb_role_t dcb_role; dcb_role_t dcb_role;
int fd = DCBFD_CLOSED; /**< The descriptor */ int fd = DCBFD_CLOSED; /**< The descriptor */

View File

@ -142,21 +142,9 @@ uint64_t dcb_get_session_id(DCB* dcb)
static MXB_WORKER* get_dcb_owner(dcb_role_t role) static MXB_WORKER* get_dcb_owner(dcb_role_t role)
{ {
MXB_WORKER* owner; /** The DCB is owned by the thread that allocates it */
mxb_assert(RoutingWorker::get_current_id() != -1);
if (role == DCB_ROLE_SERVICE_LISTENER) return RoutingWorker::get_current();
{
/** All listeners are owned by the main thread (i.e. thread no. 0) */
owner = RoutingWorker::get(RoutingWorker::MAIN);
}
else
{
/** Otherwise the DCB is owned by the thread that allocates it */
mxb_assert(RoutingWorker::get_current_id() != -1);
owner = RoutingWorker::get_current();
}
return owner;
} }
DCB::DCB(dcb_role_t role, const SListener& listener, SERVICE* service) DCB::DCB(dcb_role_t role, const SListener& listener, SERVICE* service)
@ -272,9 +260,7 @@ static void dcb_final_free(DCB* dcb)
* been closed. * been closed.
*/ */
mxb_assert(dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER mxb_assert(dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER || dcb->dcb_role == DCB_ROLE_INTERNAL);
|| dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER
|| dcb->dcb_role == DCB_ROLE_INTERNAL);
session_put_ref(local_session); session_put_ref(local_session);
return; return;
} }
@ -1031,10 +1017,6 @@ static void log_illegal_dcb(DCB* dcb)
connected_to = "Internal DCB"; connected_to = "Internal DCB";
break; break;
case DCB_ROLE_SERVICE_LISTENER:
connected_to = dcb->service->name;
break;
default: default:
connected_to = "Illegal DCB role"; connected_to = "Illegal DCB role";
break; break;
@ -1103,17 +1085,10 @@ void dcb_close(DCB* dcb)
{ {
dcb->n_close = 1; dcb->n_close = 1;
if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER) RoutingWorker* worker = static_cast<RoutingWorker*>(dcb->owner);
{ mxb_assert(worker);
dcb_final_close(dcb);
}
else
{
RoutingWorker* worker = static_cast<RoutingWorker*>(dcb->owner);
mxb_assert(worker);
worker->register_zombie(dcb); worker->register_zombie(dcb);
}
} }
else else
{ {
@ -1226,8 +1201,8 @@ void dcb_final_close(DCB* dcb)
} }
else else
{ {
// Only listeners are closed with a fd of -1 // Only internal DCBs are closed with a fd of -1
mxb_assert(dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER || dcb->dcb_role == DCB_ROLE_INTERNAL); mxb_assert(dcb->dcb_role == DCB_ROLE_INTERNAL);
} }
dcb->state = DCB_STATE_DISCONNECTED; dcb->state = DCB_STATE_DISCONNECTED;
@ -1810,7 +1785,7 @@ static int gw_write(DCB* dcb, GWBUF* writeq, bool* stop_writing)
#endif #endif
{ {
MXS_ERROR("Write to %s %s in state %s failed: %d, %s", MXS_ERROR("Write to %s %s in state %s failed: %d, %s",
DCB_STRTYPE(dcb), dcb->type(),
dcb->remote, dcb->remote,
STRDCBSTATE(dcb->state), STRDCBSTATE(dcb->state),
saved_errno, saved_errno,
@ -2345,11 +2320,7 @@ char* dcb_role_name(DCB* dcb)
if (name) if (name)
{ {
name[0] = 0; name[0] = 0;
if (DCB_ROLE_SERVICE_LISTENER == dcb->dcb_role) if (DCB_ROLE_CLIENT_HANDLER == dcb->dcb_role)
{
strcat(name, "Service Listener");
}
else if (DCB_ROLE_CLIENT_HANDLER == dcb->dcb_role)
{ {
strcat(name, "Client Request Handler"); strcat(name, "Client Request Handler");
} }
@ -2380,8 +2351,7 @@ static void dcb_add_to_list_cb(int thread_id, void* data)
static void dcb_add_to_list(DCB* dcb) static void dcb_add_to_list(DCB* dcb)
{ {
if (dcb->dcb_role != DCB_ROLE_SERVICE_LISTENER if (dcb->thread.next == NULL && dcb->thread.tail == NULL)
|| (dcb->thread.next == NULL && dcb->thread.tail == NULL))
{ {
/** /**
* This is a DCB which is either not a listener or it is a listener which * This is a DCB which is either not a listener or it is a listener which
@ -2593,8 +2563,7 @@ int dcb_get_port(const DCB* dcb)
static uint32_t dcb_process_poll_events(DCB* dcb, uint32_t events) static uint32_t dcb_process_poll_events(DCB* dcb, uint32_t events)
{ {
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->owner); RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->owner);
mxb_assert(owner == RoutingWorker::get_current() mxb_assert(owner == RoutingWorker::get_current());
|| dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER);
uint32_t rc = MXB_POLL_NOP; uint32_t rc = MXB_POLL_NOP;
@ -2607,13 +2576,6 @@ static uint32_t dcb_process_poll_events(DCB* dcb, uint32_t events)
return rc; return rc;
} }
MXS_DEBUG("%lu [poll_waitevents] event %d dcb %p "
"role %s",
pthread_self(),
events,
dcb,
STRDCBROLE(dcb->dcb_role));
if (dcb->n_close != 0) if (dcb->n_close != 0)
{ {
MXS_WARNING("Events reported for dcb(%p), owned by %d, that has been closed %" PRIu32 " times.", MXS_WARNING("Events reported for dcb(%p), owned by %d, that has been closed %" PRIu32 " times.",
@ -3038,82 +3000,44 @@ static bool add_fd_to_routing_workers(int fd, uint32_t events, MXB_POLL_DATA* da
static bool dcb_add_to_worker(Worker* worker, DCB* dcb, uint32_t events) static bool dcb_add_to_worker(Worker* worker, DCB* dcb, uint32_t events)
{ {
mxb_assert(worker == dcb->owner);
bool rv = false; bool rv = false;
if (!worker) if (worker == RoutingWorker::get_current())
{ {
// No specific worker; indicates the DCB is a listening DCB. // If the DCB should end up on the current thread, we can both add it
mxb_assert(dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER); // to the epoll-instance and to the DCB book-keeping immediately.
if (worker->add_fd(dcb->fd, events, (MXB_POLL_DATA*)dcb))
// A listening DCB, we add it immediately.
if (add_fd_to_routing_workers(dcb->fd, events, (MXB_POLL_DATA*)dcb))
{ {
// If this takes place on the main thread (all listening DCBs are dcb_add_to_list(dcb);
// stored on the main thread)...
if (dcb->owner == RoutingWorker::get_current())
{
// ..we'll add it immediately to the list,
dcb_add_to_list(dcb);
}
else
{
// otherwise we must move the adding to the main thread.
// TODO: Separate listening and other DCBs, as this is a mess.
RoutingWorker* worker = static_cast<RoutingWorker*>(dcb->owner);
mxb_assert(worker);
intptr_t arg1 = (intptr_t)dcb_add_to_list_cb;
intptr_t arg2 = (intptr_t)dcb;
if (!worker->post_message(MXB_WORKER_MSG_CALL, arg1, arg2))
{
MXS_ERROR("Could not post listening DCB for book-keeping to worker.");
}
}
rv = true; rv = true;
} }
} }
else else
{ {
mxb_assert(worker == dcb->owner); // Otherwise we'll move the whole operation to the correct worker.
// This will only happen for "cli" and "maxinfo" services that must
// be served by one thread as there otherwise deadlocks can occur.
AddDcbToWorker* task = new(std::nothrow) AddDcbToWorker(dcb, events);
mxb_assert(task);
if (worker == RoutingWorker::get_current()) if (task)
{ {
// If the DCB should end up on the current thread, we can both add it Worker* worker = static_cast<RoutingWorker*>(dcb->owner);
// to the epoll-instance and to the DCB book-keeping immediately. mxb_assert(worker);
if (worker->add_fd(dcb->fd, events, (MXB_POLL_DATA*)dcb))
if (worker->execute(std::unique_ptr<AddDcbToWorker>(task), Worker::EXECUTE_QUEUED))
{ {
dcb_add_to_list(dcb);
rv = true; rv = true;
} }
else
{
MXS_ERROR("Could not post task to add DCB to worker.");
}
} }
else else
{ {
// Otherwise we'll move the whole operation to the correct worker. MXS_OOM();
// This will only happen for "cli" and "maxinfo" services that must
// be served by one thread as there otherwise deadlocks can occur.
AddDcbToWorker* task = new(std::nothrow) AddDcbToWorker(dcb, events);
mxb_assert(task);
if (task)
{
Worker* worker = static_cast<RoutingWorker*>(dcb->owner);
mxb_assert(worker);
if (worker->execute(std::unique_ptr<AddDcbToWorker>(task), Worker::EXECUTE_QUEUED))
{
rv = true;
}
else
{
MXS_ERROR("Could not post task to add DCB to worker.");
}
}
else
{
MXS_OOM();
}
} }
} }
@ -3136,17 +3060,7 @@ int poll_add_dcb(DCB* dcb)
dcb_state_t new_state; dcb_state_t new_state;
RoutingWorker* owner = nullptr; RoutingWorker* owner = nullptr;
if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER) if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER)
{
/**
* Listeners are always added in level triggered mode. This will cause
* new events to be generated as long as there is at least one connection
* to accept.
*/
events = EPOLLIN;
new_state = DCB_STATE_LISTENING;
}
else if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER)
{ {
if (strcasecmp(dcb->service->routerModule, "cli") == 0 if (strcasecmp(dcb->service->routerModule, "cli") == 0
|| strcasecmp(dcb->service->routerModule, "maxinfo") == 0) || strcasecmp(dcb->service->routerModule, "maxinfo") == 0)
@ -3241,22 +3155,12 @@ int poll_remove_dcb(DCB* dcb)
{ {
rc = -1; rc = -1;
if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER) Worker* worker = static_cast<Worker*>(dcb->owner);
{ mxb_assert(worker);
if (RoutingWorker::remove_shared_fd(dcbfd))
{
rc = 0;
}
}
else
{
Worker* worker = static_cast<Worker*>(dcb->owner);
mxb_assert(worker);
if (worker->remove_fd(dcbfd)) if (worker->remove_fd(dcbfd))
{ {
rc = 0; rc = 0;
}
} }
} }
return rc; return rc;
@ -3362,3 +3266,19 @@ json_t* dcb_to_json(DCB* dcb)
return obj; return obj;
} }
const char* DCB::type()
{
switch (dcb_role)
{
case DCB_ROLE_CLIENT_HANDLER:
return "Client DCB";
case DCB_ROLE_BACKEND_HANDLER:
return "Backend DCB";
case DCB_ROLE_INTERNAL:
return "Internal DCB";
default:
mxb_assert(!true);
return "Unknown DCB";
}
}