Markus Mäkelä 7dde0edb54
Clean up unexpected error handling in readwritesplit
By using the Error class, the code can be cleaned up and simplified.
2019-06-14 15:18:01 +03:00

479 lines
12 KiB
C++

/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2022-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/protocol/rwbackend.hh>
#include <maxscale/modutil.hh>
#include <maxscale/protocol/mysql.hh>
using Iter = mxs::Buffer::iterator;
namespace maxscale
{
RWBackend::RWBackend(SERVER_REF* ref)
: mxs::Backend(ref)
, m_reply_state(REPLY_STATE_DONE)
, m_modutil_state{0}
, m_command(0)
, m_opening_cursor(false)
, m_expected_rows(0)
, m_local_infile_requested(false)
{
}
RWBackend::~RWBackend()
{
}
bool RWBackend::execute_session_command()
{
m_command = next_session_command()->get_command();
bool expect_response = mxs_mysql_command_will_respond(m_command);
bool rval = mxs::Backend::execute_session_command();
if (rval && expect_response)
{
set_reply_state(REPLY_STATE_START);
}
return rval;
}
bool RWBackend::continue_session_command(GWBUF* buffer)
{
return Backend::write(buffer, NO_RESPONSE);
}
void RWBackend::add_ps_handle(uint32_t id, uint32_t handle)
{
m_ps_handles[id] = handle;
MXS_INFO("PS response for %s: %u -> %u", name(), id, handle);
}
uint32_t RWBackend::get_ps_handle(uint32_t id) const
{
BackendHandleMap::const_iterator it = m_ps_handles.find(id);
if (it != m_ps_handles.end())
{
return it->second;
}
return 0;
}
bool RWBackend::write(GWBUF* buffer, response_type type)
{
uint32_t len = mxs_mysql_get_packet_len(buffer);
bool was_large_query = m_large_query;
m_large_query = len == MYSQL_PACKET_LENGTH_MAX;
if (was_large_query)
{
return mxs::Backend::write(buffer, Backend::NO_RESPONSE);
}
if (type == mxs::Backend::EXPECT_RESPONSE)
{
/** The server will reply to this command */
set_reply_state(REPLY_STATE_START);
}
uint8_t cmd = mxs_mysql_get_command(buffer);
m_command = cmd;
if (mxs_mysql_is_ps_command(cmd))
{
uint32_t id = mxs_mysql_extract_ps_id(buffer);
BackendHandleMap::iterator it = m_ps_handles.find(id);
if (it != m_ps_handles.end())
{
/** Replace the client handle with the real PS handle */
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
gw_mysql_set_byte4(ptr, it->second);
if (cmd == MXS_COM_STMT_EXECUTE)
{
// Extract the flag byte after the statement ID
uint8_t flags = 0;
gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET + MYSQL_PS_ID_SIZE, 1, &flags);
// Any non-zero flag value means that we have an open cursor
m_opening_cursor = flags != 0;
}
else if (cmd == MXS_COM_STMT_CLOSE)
{
m_ps_handles.erase(it);
}
else if (cmd == MXS_COM_STMT_FETCH)
{
// Number of rows to fetch is a 4 byte integer after the ID
uint8_t buf[4];
gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET + MYSQL_PS_ID_SIZE, 4, buf);
m_expected_rows = gw_mysql_get_byte4(buf);
}
}
}
return mxs::Backend::write(buffer, type);
}
void RWBackend::close(close_type type)
{
m_reply_state = REPLY_STATE_DONE;
mxs::Backend::close(type);
}
bool RWBackend::consume_fetched_rows(GWBUF* buffer)
{
bool rval = false;
bool more = false;
int n_eof = modutil_count_signal_packets(buffer, 0, &more, &m_modutil_state);
// If the server responded with an error, n_eof > 0
if (n_eof > 0)
{
rval = true;
}
else
{
m_expected_rows -= modutil_count_packets(buffer);
mxb_assert(m_expected_rows >= 0);
rval = m_expected_rows == 0;
}
return rval;
}
static inline bool have_next_packet(GWBUF* buffer)
{
uint32_t len = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer)) + MYSQL_HEADER_LEN;
return gwbuf_length(buffer) > len;
}
uint64_t get_encoded_int(Iter it)
{
uint64_t len = *it++;
switch (len)
{
case 0xfc:
len = *it++;
len |= ((uint64_t)*it++) << 8;
break;
case 0xfd:
len = *it++;
len |= ((uint64_t)*it++) << 8;
len |= ((uint64_t)*it++) << 16;
break;
case 0xfe:
len = *it++;
len |= ((uint64_t)*it++) << 8;
len |= ((uint64_t)*it++) << 16;
len |= ((uint64_t)*it++) << 24;
len |= ((uint64_t)*it++) << 32;
len |= ((uint64_t)*it++) << 40;
len |= ((uint64_t)*it++) << 48;
len |= ((uint64_t)*it++) << 56;
break;
default:
break;
}
return len;
}
Iter skip_encoded_int(Iter it)
{
switch (*it)
{
case 0xfc:
it.advance(3);
break;
case 0xfd:
it.advance(4);
break;
case 0xfe:
it.advance(9);
break;
default:
++it;
break;
}
return it;
}
bool is_last_ok(Iter it)
{
++it; // Skip the command byte
it = skip_encoded_int(it); // Affected rows
it = skip_encoded_int(it); // Last insert ID
uint16_t status = *it++;
status |= (*it++) << 8;
return (status & SERVER_MORE_RESULTS_EXIST) == 0;
}
bool is_last_eof(Iter it)
{
std::advance(it, 3); // Skip the command byte and warning count
uint16_t status = *it++;
status |= (*it++) << 8;
return (status & SERVER_MORE_RESULTS_EXIST) == 0;
}
void RWBackend::process_reply_start(Iter it, Iter end)
{
uint8_t cmd = *it;
m_local_infile_requested = false;
switch (cmd)
{
case MYSQL_REPLY_OK:
if (is_last_ok(it))
{
// No more results
set_reply_state(REPLY_STATE_DONE);
}
break;
case MYSQL_REPLY_LOCAL_INFILE:
m_local_infile_requested = true;
set_reply_state(REPLY_STATE_DONE);
break;
case MYSQL_REPLY_ERR:
// Nothing ever follows an error packet
++it;
update_error(it, end);
set_reply_state(REPLY_STATE_DONE);
break;
case MYSQL_REPLY_EOF:
// EOF packets are never expected as the first response
mxb_assert(!true);
break;
default:
if (current_command() == MXS_COM_FIELD_LIST)
{
// COM_FIELD_LIST sends a strange kind of a result set
set_reply_state(REPLY_STATE_RSET_ROWS);
}
else
{
// Start of a result set
m_num_coldefs = get_encoded_int(it);
set_reply_state(REPLY_STATE_RSET_COLDEF);
}
break;
}
}
void RWBackend::process_packets(GWBUF* result)
{
mxs::Buffer buffer(result);
auto it = buffer.begin();
while (it != buffer.end())
{
// Extract packet length and command byte
uint32_t len = *it++;
len |= (*it++) << 8;
len |= (*it++) << 16;
++it; // Skip the sequence
mxb_assert(it != buffer.end());
auto end = it;
end.advance(len);
uint8_t cmd = *it;
// Ignore the tail end of a large packet large packet. Only resultsets can generate packets this large
// and we don't care what the contents are and thus it is safe to ignore it.
bool skip_next = m_skip_next;
m_skip_next = len == GW_MYSQL_MAX_PACKET_LEN;
if (skip_next)
{
it = end;
continue;
}
switch (m_reply_state)
{
case REPLY_STATE_START:
process_reply_start(it, end);
break;
case REPLY_STATE_DONE:
if (cmd == MYSQL_REPLY_ERR)
{
update_error(++it, end);
}
else
{
// This should never happen
MXS_ERROR("Unexpected result state. cmd: 0x%02hhx, len: %u", cmd, len);
mxb_assert(!true);
}
break;
case REPLY_STATE_RSET_COLDEF:
mxb_assert(m_num_coldefs > 0);
--m_num_coldefs;
if (m_num_coldefs == 0)
{
set_reply_state(REPLY_STATE_RSET_COLDEF_EOF);
// Skip this state when DEPRECATE_EOF capability is supported
}
break;
case REPLY_STATE_RSET_COLDEF_EOF:
mxb_assert(cmd == MYSQL_REPLY_EOF && len == MYSQL_EOF_PACKET_LEN - MYSQL_HEADER_LEN);
set_reply_state(REPLY_STATE_RSET_ROWS);
if (is_opening_cursor())
{
set_cursor_opened();
MXS_INFO("Cursor successfully opened");
set_reply_state(REPLY_STATE_DONE);
}
break;
case REPLY_STATE_RSET_ROWS:
if (cmd == MYSQL_REPLY_EOF && len == MYSQL_EOF_PACKET_LEN - MYSQL_HEADER_LEN)
{
set_reply_state(is_last_eof(it) ? REPLY_STATE_DONE : REPLY_STATE_START);
}
else if (cmd == MYSQL_REPLY_ERR)
{
++it;
update_error(it, end);
set_reply_state(REPLY_STATE_DONE);
}
break;
}
it = end;
}
buffer.release();
}
/**
* @brief Process a possibly partial response from the backend
*
* @param buffer Buffer containing the response
*/
void RWBackend::process_reply(GWBUF* buffer)
{
m_error.clear();
if (current_command() == MXS_COM_STMT_FETCH)
{
// TODO: m_error is not updated here.
// If the server responded with an error, n_eof > 0
if (consume_fetched_rows(buffer))
{
set_reply_state(REPLY_STATE_DONE);
}
}
else if (current_command() == MXS_COM_STATISTICS || GWBUF_IS_COLLECTED_RESULT(buffer))
{
// COM_STATISTICS returns a single string and thus requires special handling.
// Collected result are all in one buffer and need no processing.
set_reply_state(REPLY_STATE_DONE);
}
else
{
// Normal result, process it one packet at a time
process_packets(buffer);
}
if (get_reply_state() == REPLY_STATE_DONE && is_waiting_result())
{
ack_write();
}
}
ResponseStat& RWBackend::response_stat()
{
return m_response_stat;
}
void RWBackend::change_rlag_state(SERVER::RLagState new_state, int max_rlag)
{
mxb_assert(new_state == SERVER::RLagState::BELOW_LIMIT || new_state == SERVER::RLagState::ABOVE_LIMIT);
namespace atom = maxbase::atomic;
auto srv = server();
auto old_state = atom::load(&srv->rlag_state, atom::RELAXED);
if (new_state != old_state)
{
atom::store(&srv->rlag_state, new_state, atom::RELAXED);
// State has just changed, log warning. Don't log catchup if old state was RLAG_NONE.
if (new_state == SERVER::RLagState::ABOVE_LIMIT)
{
MXS_WARNING("Replication lag of '%s' is %is, which is above the configured limit %is. "
"'%s' is excluded from query routing.",
srv->name(), srv->rlag, max_rlag, srv->name());
}
else if (old_state == SERVER::RLagState::ABOVE_LIMIT)
{
MXS_WARNING("Replication lag of '%s' is %is, which is below the configured limit %is. "
"'%s' is returned to query routing.",
srv->name(), srv->rlag, max_rlag, srv->name());
}
}
}
mxs::SRWBackends RWBackend::from_servers(SERVER_REF* servers)
{
SRWBackends backends;
for (SERVER_REF* ref = servers; ref; ref = ref->next)
{
if (ref->active)
{
backends.emplace_back(new mxs::RWBackend(ref));
}
}
return backends;
}
void RWBackend::update_error(Iter it, Iter end)
{
uint16_t code = 0;
code |= (*it++);
code |= (*it++) << 8;
++it;
auto sql_state_begin = it;
it.advance(5);
auto sql_state_end = it;
auto message_begin = sql_state_end;
auto message_end = end;
m_error.set(code, sql_state_begin, sql_state_end, message_begin, message_end);
}
}