MXS-1983: Retry message posting
If the posting of a message to a message queue fails due to the queue being full, it is retried for a limited number of times. This is a temporary fix to the problem that fixing MXS-1948 introduced. A proper solution that leverages SO_REUSEPORT should be implemented in the next major release.
This commit is contained in:
@ -175,12 +175,57 @@ bool MessageQueue::post(const Message& message) const
|
|||||||
ss_dassert(m_pWorker);
|
ss_dassert(m_pWorker);
|
||||||
if (m_pWorker)
|
if (m_pWorker)
|
||||||
{
|
{
|
||||||
ssize_t n = write(m_write_fd, &message, sizeof(message));
|
/**
|
||||||
|
* This is a stopgap measure to solve MXS-1983 that causes Resource temporarily
|
||||||
|
* unavailable errors. The errors are caused by the pipe buffer being too small to
|
||||||
|
* hold all worker messages. By retrying a limited number of times before giving
|
||||||
|
* up, the success rate for posted messages under heavy load increases
|
||||||
|
* significantly.
|
||||||
|
*/
|
||||||
|
int fast = 0;
|
||||||
|
int slow = 0;
|
||||||
|
const int fast_size = 100;
|
||||||
|
const int slow_limit = 3;
|
||||||
|
ssize_t n;
|
||||||
|
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
n = write(m_write_fd, &message, sizeof(message));
|
||||||
rv = (n == sizeof(message));
|
rv = (n == sizeof(message));
|
||||||
|
|
||||||
|
if (n == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
|
||||||
|
{
|
||||||
|
if (++fast > fast_size)
|
||||||
|
{
|
||||||
|
fast = 0;
|
||||||
|
|
||||||
|
if (++slow >= slow_limit)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
sched_yield()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (n == -1)
|
if (n == -1)
|
||||||
{
|
{
|
||||||
MXS_ERROR("Failed to write message: %d, %s", errno, mxs_strerror(errno));
|
MXS_ERROR("Failed to write message: %d, %s", errno, mxs_strerror(errno));
|
||||||
|
|
||||||
|
static bool warn_pipe_buffer_size = true;
|
||||||
|
|
||||||
|
if ((errno == EAGAIN || errno == EWOULDBLOCK) && warn_pipe_buffer_size)
|
||||||
|
{
|
||||||
|
MXS_ERROR("Consider increasing pipe buffer size (sysctl fs.pipe-max-size)");
|
||||||
|
warn_pipe_buffer_size = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|||||||
Reference in New Issue
Block a user