Merge branch '2.2' into develop

This commit is contained in:
Johan Wikman
2018-01-26 10:26:22 +02:00
11 changed files with 435 additions and 49 deletions

View File

@ -53,12 +53,21 @@ uint64_t atomic_add_uint64(uint64_t *variable, int64_t value)
#endif
}
int atomic_load_int32(const int *variable)
int atomic_load_int(const int *variable)
{
#ifdef MXS_USE_ATOMIC_BUILTINS
return __atomic_load_n(variable, __ATOMIC_SEQ_CST);
#else
return __sync_fetch_and_or((volatile int *)variable, 0);
return __sync_fetch_and_or(variable, 0);
#endif
}
int32_t atomic_load_int32(const int32_t *variable)
{
#ifdef MXS_USE_ATOMIC_BUILTINS
return __atomic_load_n(variable, __ATOMIC_SEQ_CST);
#else
return __sync_fetch_and_or(variable, 0);
#endif
}
@ -67,7 +76,16 @@ int64_t atomic_load_int64(const int64_t *variable)
#ifdef MXS_USE_ATOMIC_BUILTINS
return __atomic_load_n(variable, __ATOMIC_SEQ_CST);
#else
return __sync_fetch_and_or((volatile int *)variable, 0);
return __sync_fetch_and_or(variable, 0);
#endif
}
uint32_t atomic_load_uint32(const uint32_t *variable)
{
#ifdef MXS_USE_ATOMIC_BUILTINS
return __atomic_load_n(variable, __ATOMIC_SEQ_CST);
#else
return __sync_fetch_and_or(variable, 0);
#endif
}
@ -76,7 +94,7 @@ uint64_t atomic_load_uint64(const uint64_t *variable)
#ifdef MXS_USE_ATOMIC_BUILTINS
return __atomic_load_n(variable, __ATOMIC_SEQ_CST);
#else
return __sync_fetch_and_or((volatile int *)variable, 0);
return __sync_fetch_and_or(variable, 0);
#endif
}

View File

@ -172,6 +172,14 @@ public:
return m_statistics;
}
/**
* Return the count of descriptors.
*
* @param pnCurrent On output the current number of descriptors.
* @param pnTotal On output the total number of descriptors.
*/
void get_descriptor_counts(uint32_t* pnCurrent, uint64_t* pnTotal);
/**
* Add a file descriptor to the epoll instance of the worker.
*
@ -508,21 +516,23 @@ private:
uint32_t handle_epoll_events(uint32_t events);
private:
int m_id; /*< The id of the worker. */
state_t m_state; /*< The state of the worker */
int m_epoll_fd; /*< The epoll file descriptor. */
STATISTICS m_statistics; /*< Worker statistics. */
MessageQueue* m_pQueue; /*< The message queue of the worker. */
THREAD m_thread; /*< The thread handle of the worker. */
bool m_started; /*< Whether the thread has been started or not. */
bool m_should_shutdown; /*< Whether shutdown should be performed. */
bool m_shutdown_initiated; /*< Whether shutdown has been initated. */
SessionsById m_sessions; /*< A mapping of session_id->MXS_SESSION. The map
* should contain sessions exclusive to this
* worker and not e.g. listener sessions. For now,
* it's up to the protocol to decide whether a new
* session is added to the map. */
Zombies m_zombies; /*< DCBs to be deleted. */
int m_id; /*< The id of the worker. */
state_t m_state; /*< The state of the worker */
int m_epoll_fd; /*< The epoll file descriptor. */
STATISTICS m_statistics; /*< Worker statistics. */
MessageQueue* m_pQueue; /*< The message queue of the worker. */
THREAD m_thread; /*< The thread handle of the worker. */
bool m_started; /*< Whether the thread has been started or not. */
bool m_should_shutdown; /*< Whether shutdown should be performed. */
bool m_shutdown_initiated; /*< Whether shutdown has been initated. */
SessionsById m_sessions; /*< A mapping of session_id->MXS_SESSION. The map
* should contain sessions exclusive to this
* worker and not e.g. listener sessions. For now,
* it's up to the protocol to decide whether a new
* session is added to the map. */
Zombies m_zombies; /*< DCBs to be deleted. */
uint32_t m_nCurrent_descriptors; /*< Current number of descriptors. */
uint64_t m_nTotal_descriptors; /*< Total number of descriptors. */
};
}

View File

@ -240,8 +240,8 @@ dShowThreads(DCB *dcb)
{
dcb_printf(dcb, "Polling Threads.\n\n");
dcb_printf(dcb, " ID | State \n");
dcb_printf(dcb, "----+------------\n");
dcb_printf(dcb, " ID | State | #descriptors (curr) | #descriptors (tot) |\n");
dcb_printf(dcb, "----+------------+---------------------+---------------------+\n");
for (int i = 0; i < n_threads; i++)
{
Worker* worker = Worker::get(i);
@ -271,7 +271,12 @@ dShowThreads(DCB *dcb)
ss_dassert(!true);
}
dcb_printf(dcb, " %2d | %s\n", i, state);
uint32_t nCurrent;
uint64_t nTotal;
worker->get_descriptor_counts(&nCurrent, &nTotal);
dcb_printf(dcb, " %2d | %10s | %19" PRIu32 " | %19" PRIu64 " |\n", i, state, nCurrent, nTotal);
}
}

View File

@ -162,6 +162,8 @@ Worker::Worker(int id,
, m_started(false)
, m_should_shutdown(false)
, m_shutdown_initiated(false)
, m_nCurrent_descriptors(0)
, m_nTotal_descriptors(0)
{
MXS_POLL_DATA::handler = &Worker::epoll_instance_handler;
MXS_POLL_DATA::thread.id = id;
@ -421,6 +423,12 @@ int64_t Worker::get_one_statistic(POLL_STAT what)
return rv;
}
void Worker::get_descriptor_counts(uint32_t* pnCurrent, uint64_t* pnTotal)
{
*pnCurrent = atomic_load_uint32(&m_nCurrent_descriptors);
*pnTotal = atomic_load_uint64(&m_nTotal_descriptors);
}
bool Worker::add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData)
{
bool rv = true;
@ -435,7 +443,12 @@ bool Worker::add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData)
pData->thread.id = m_id;
if (epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &ev) != 0)
if (epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &ev) == 0)
{
atomic_add_uint32(&m_nCurrent_descriptors, 1);
atomic_add_uint64(&m_nTotal_descriptors, 1);
}
else
{
poll_resolve_error(fd, errno, EPOLL_CTL_ADD);
rv = false;
@ -479,7 +492,11 @@ bool Worker::remove_fd(int fd)
struct epoll_event ev = {};
if (epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, &ev) != 0)
if (epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, &ev) == 0)
{
atomic_add_uint32(&m_nCurrent_descriptors, -1);
}
else
{
poll_resolve_error(fd, errno, EPOLL_CTL_DEL);
rv = false;