Merge branch '2.3' into develop
This commit is contained in:
@ -65,6 +65,12 @@ using std::string;
|
||||
#define DCB_EH_NOTICE(s, p)
|
||||
#endif
|
||||
|
||||
#ifdef EPOLLRDHUP
|
||||
constexpr uint32_t poll_events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET;
|
||||
#else
|
||||
constexpr uint32_t poll_events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET;
|
||||
#endif
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
@ -2364,6 +2370,7 @@ static void dcb_add_to_list(DCB* dcb)
|
||||
}
|
||||
else
|
||||
{
|
||||
mxb_assert(this_unit.all_dcbs[id]->thread.tail->thread.next != dcb);
|
||||
this_unit.all_dcbs[id]->thread.tail->thread.next = dcb;
|
||||
this_unit.all_dcbs[id]->thread.tail = dcb;
|
||||
}
|
||||
@ -2530,6 +2537,8 @@ void dcb_foreach_local(bool (* func)(DCB* dcb, void* data), void* data)
|
||||
{
|
||||
if (dcb->session)
|
||||
{
|
||||
mxb_assert(dcb->thread.next != dcb);
|
||||
|
||||
if (!func(dcb, data))
|
||||
{
|
||||
break;
|
||||
@ -3052,13 +3061,7 @@ int poll_add_dcb(DCB* dcb)
|
||||
{
|
||||
dcb_sanity_check(dcb);
|
||||
|
||||
uint32_t events = 0;
|
||||
|
||||
#ifdef EPOLLRDHUP
|
||||
events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET;
|
||||
#else
|
||||
events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET;
|
||||
#endif
|
||||
uint32_t events = poll_events;
|
||||
|
||||
/** Choose new state and worker thread ID according to the role of DCB. */
|
||||
dcb_state_t new_state;
|
||||
@ -3190,13 +3193,23 @@ DCB* dcb_get_current()
|
||||
static int upstream_throttle_callback(DCB* dcb, DCB_REASON reason, void* userdata)
|
||||
{
|
||||
DCB* client_dcb = dcb->session->client_dcb;
|
||||
mxb::Worker* worker = static_cast<mxb::Worker*>(client_dcb->owner);
|
||||
|
||||
// The fd is removed manually here due to the fact that poll_add_dcb causes the DCB to be added to the
|
||||
// worker's list of DCBs but poll_remove_dcb doesn't remove it from it. This is due to the fact that the
|
||||
// DCBs are only removed from the list when they are closed.
|
||||
if (reason == DCB_REASON_HIGH_WATER)
|
||||
{
|
||||
poll_remove_dcb(client_dcb);
|
||||
MXS_INFO("High water mark hit for '%s'@'%s', not reading data until low water mark is hit",
|
||||
client_dcb->user, client_dcb->remote);
|
||||
worker->remove_fd(client_dcb->fd);
|
||||
client_dcb->state = DCB_STATE_NOPOLLING;
|
||||
}
|
||||
else if (reason == DCB_REASON_LOW_WATER)
|
||||
{
|
||||
poll_add_dcb(client_dcb);
|
||||
MXS_INFO("Low water mark hit for '%s'@'%s', accepting new data", client_dcb->user, client_dcb->remote);
|
||||
worker->add_fd(client_dcb->fd, poll_events, (MXB_POLL_DATA*)client_dcb);
|
||||
client_dcb->state = DCB_STATE_POLLING;
|
||||
}
|
||||
|
||||
return 0;
|
||||
@ -3208,7 +3221,13 @@ bool backend_dcb_remove_func(DCB* dcb, void* data)
|
||||
|
||||
if (dcb->session == session && dcb->role == DCB::Role::BACKEND)
|
||||
{
|
||||
poll_remove_dcb(dcb);
|
||||
DCB* client_dcb = dcb->session->client_dcb;
|
||||
MXS_INFO("High water mark hit for connection to '%s' from %s'@'%s', not reading data until low water "
|
||||
"mark is hit", dcb->server->name(), client_dcb->user, client_dcb->remote);
|
||||
|
||||
mxb::Worker* worker = static_cast<mxb::Worker*>(dcb->owner);
|
||||
worker->remove_fd(dcb->fd);
|
||||
dcb->state = DCB_STATE_NOPOLLING;
|
||||
}
|
||||
|
||||
return true;
|
||||
@ -3220,7 +3239,13 @@ bool backend_dcb_add_func(DCB* dcb, void* data)
|
||||
|
||||
if (dcb->session == session && dcb->role == DCB::Role::BACKEND)
|
||||
{
|
||||
poll_add_dcb(dcb);
|
||||
DCB* client_dcb = dcb->session->client_dcb;
|
||||
MXS_INFO("Low water mark hit for connection to '%s' from '%s'@'%s', accepting new data",
|
||||
dcb->server->name(), client_dcb->user, client_dcb->remote);
|
||||
|
||||
mxb::Worker* worker = static_cast<mxb::Worker*>(dcb->owner);
|
||||
worker->add_fd(dcb->fd, poll_events, (MXB_POLL_DATA*)dcb);
|
||||
dcb->state = DCB_STATE_POLLING;
|
||||
}
|
||||
|
||||
return true;
|
||||
|
@ -390,8 +390,18 @@ uint32_t QueryClassifier::ps_get_type(std::string id) const
|
||||
|
||||
void QueryClassifier::ps_erase(GWBUF* buffer)
|
||||
{
|
||||
m_ps_handles.erase(qc_mysql_extract_ps_id(buffer));
|
||||
m_sPs_manager->erase(buffer);
|
||||
if (qc_mysql_is_ps_command(mxs_mysql_get_command(buffer)))
|
||||
{
|
||||
// Erase the type of the statement stored with the internal ID
|
||||
m_sPs_manager->erase(ps_id_internal_get(buffer));
|
||||
// ... and then erase the external to internal ID mapping
|
||||
m_ps_handles.erase(qc_mysql_extract_ps_id(buffer));
|
||||
}
|
||||
else
|
||||
{
|
||||
// Not a PS command, we don't need the ID mapping
|
||||
m_sPs_manager->erase(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
bool QueryClassifier::query_type_is_read_only(uint32_t qtype) const
|
||||
|
@ -1441,7 +1441,8 @@ static int backend_write_delayqueue(DCB* dcb, GWBUF* buffer)
|
||||
|
||||
if (rc == 0)
|
||||
{
|
||||
do_handle_error(dcb, ERRACT_NEW_CONNECTION, "Lost connection to backend server.");
|
||||
do_handle_error(dcb, ERRACT_NEW_CONNECTION,
|
||||
"Lost connection to backend server while writing delay queue.");
|
||||
}
|
||||
|
||||
return rc;
|
||||
|
@ -417,3 +417,12 @@ mxs::PRWBackends::iterator find_best_backend(mxs::PRWBackends& backends,
|
||||
* The following are implemented in rwsplit_tmp_table_multi.c
|
||||
*/
|
||||
void close_all_connections(mxs::PRWBackends& backends);
|
||||
|
||||
/**
|
||||
* Utility function for extracting error messages from buffers
|
||||
*
|
||||
* @param buffer Buffer containing an error
|
||||
*
|
||||
* @return String representation of the error
|
||||
*/
|
||||
std::string extract_error(GWBUF* buffer);
|
||||
|
@ -449,6 +449,7 @@ bool RWSplitSession::route_session_write(GWBUF* querybuf, uint8_t command, uint3
|
||||
}
|
||||
else if (qc_query_is_type(type, QUERY_TYPE_DEALLOC_PREPARE))
|
||||
{
|
||||
mxb_assert(!mxs_mysql_is_ps_command(m_qc.current_route_info().command()));
|
||||
m_qc.ps_erase(querybuf);
|
||||
}
|
||||
|
||||
|
@ -29,19 +29,24 @@ using namespace maxscale;
|
||||
*/
|
||||
|
||||
|
||||
static std::string extract_error(GWBUF* buffer)
|
||||
std::string extract_error(GWBUF* buffer)
|
||||
{
|
||||
std::string rval;
|
||||
|
||||
if (MYSQL_IS_ERROR_PACKET(((uint8_t*)GWBUF_DATA(buffer))))
|
||||
{
|
||||
size_t replylen = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer));
|
||||
size_t replylen = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer)) + MYSQL_HEADER_LEN;
|
||||
char replybuf[replylen];
|
||||
gwbuf_copy_data(buffer, 0, sizeof(replybuf), (uint8_t*)replybuf);
|
||||
std::string err;
|
||||
std::string msg;
|
||||
err.append(replybuf + 8, 5);
|
||||
msg.append(replybuf + 13, replylen - 4 - 5);
|
||||
|
||||
/**
|
||||
* The payload starts with a one byte command followed by a two byte error code, a six byte state and
|
||||
* a human-readable string that spans the rest of the packet.
|
||||
*/
|
||||
err.append(replybuf + MYSQL_HEADER_LEN + 3, 6);
|
||||
msg.append(replybuf + MYSQL_HEADER_LEN + 3 + 6, replylen - MYSQL_HEADER_LEN - 3 - 6);
|
||||
rval = err + ": " + msg;
|
||||
}
|
||||
|
||||
|
@ -941,7 +941,10 @@ void RWSplitSession::handleError(GWBUF* errmsgbuf,
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Lost connection to the master server, closing session.%s", errmsg.c_str());
|
||||
int64_t idle = mxs_clock() - backend->dcb()->last_read;
|
||||
MXS_ERROR("Lost connection to the master server, closing session.%s "
|
||||
"Connection has been idle for %.1f seconds. Error caused by: %s",
|
||||
errmsg.c_str(), (float)idle / 10.f, extract_error(errmsgbuf).c_str());
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user