The knowledge of which function to call can be internal to RWBackend. This make the use of the class easier as one can simply write to the backend.
438 lines
11 KiB
C++
438 lines
11 KiB
C++
/*
|
|
* Copyright (c) 2016 MariaDB Corporation Ab
|
|
*
|
|
* Use of this software is governed by the Business Source License included
|
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
|
*
|
|
* Change Date: 2022-01-01
|
|
*
|
|
* On the date above, in accordance with the Business Source License, use
|
|
* of this software will be governed by version 2 or later of the General
|
|
* Public License.
|
|
*/
|
|
|
|
#include <maxscale/protocol/rwbackend.hh>
|
|
|
|
#include <maxscale/modutil.hh>
|
|
#include <maxscale/protocol/mysql.hh>
|
|
|
|
using Iter = mxs::Buffer::iterator;
|
|
|
|
namespace maxscale
|
|
{
|
|
|
|
RWBackend::RWBackend(SERVER_REF* ref)
|
|
: mxs::Backend(ref)
|
|
, m_reply_state(REPLY_STATE_DONE)
|
|
, m_modutil_state{0}
|
|
, m_command(0)
|
|
, m_opening_cursor(false)
|
|
, m_expected_rows(0)
|
|
, m_local_infile_requested(false)
|
|
{
|
|
}
|
|
|
|
RWBackend::~RWBackend()
|
|
{
|
|
}
|
|
|
|
bool RWBackend::execute_session_command()
|
|
{
|
|
m_command = next_session_command()->get_command();
|
|
bool expect_response = mxs_mysql_command_will_respond(m_command);
|
|
bool rval = mxs::Backend::execute_session_command();
|
|
|
|
if (rval && expect_response)
|
|
{
|
|
set_reply_state(REPLY_STATE_START);
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
bool RWBackend::continue_session_command(GWBUF* buffer)
|
|
{
|
|
return Backend::write(buffer, NO_RESPONSE);
|
|
}
|
|
|
|
void RWBackend::add_ps_handle(uint32_t id, uint32_t handle)
|
|
{
|
|
m_ps_handles[id] = handle;
|
|
MXS_INFO("PS response for %s: %u -> %u", name(), id, handle);
|
|
}
|
|
|
|
uint32_t RWBackend::get_ps_handle(uint32_t id) const
|
|
{
|
|
BackendHandleMap::const_iterator it = m_ps_handles.find(id);
|
|
|
|
if (it != m_ps_handles.end())
|
|
{
|
|
return it->second;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
bool RWBackend::write(GWBUF* buffer, response_type type)
|
|
{
|
|
uint32_t len = mxs_mysql_get_packet_len(buffer);
|
|
bool was_large_query = m_large_query;
|
|
m_large_query = len == MYSQL_PACKET_LENGTH_MAX;
|
|
|
|
if (was_large_query)
|
|
{
|
|
return mxs::Backend::write(buffer, Backend::NO_RESPONSE);
|
|
}
|
|
|
|
if (type == mxs::Backend::EXPECT_RESPONSE)
|
|
{
|
|
/** The server will reply to this command */
|
|
set_reply_state(REPLY_STATE_START);
|
|
}
|
|
|
|
uint8_t cmd = mxs_mysql_get_command(buffer);
|
|
|
|
m_command = cmd;
|
|
|
|
if (mxs_mysql_is_ps_command(cmd))
|
|
{
|
|
uint32_t id = mxs_mysql_extract_ps_id(buffer);
|
|
BackendHandleMap::iterator it = m_ps_handles.find(id);
|
|
|
|
if (it != m_ps_handles.end())
|
|
{
|
|
/** Replace the client handle with the real PS handle */
|
|
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
|
|
gw_mysql_set_byte4(ptr, it->second);
|
|
|
|
if (cmd == MXS_COM_STMT_EXECUTE)
|
|
{
|
|
// Extract the flag byte after the statement ID
|
|
uint8_t flags = 0;
|
|
gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET + MYSQL_PS_ID_SIZE, 1, &flags);
|
|
|
|
// Any non-zero flag value means that we have an open cursor
|
|
m_opening_cursor = flags != 0;
|
|
}
|
|
else if (cmd == MXS_COM_STMT_CLOSE)
|
|
{
|
|
m_ps_handles.erase(it);
|
|
}
|
|
else if (cmd == MXS_COM_STMT_FETCH)
|
|
{
|
|
// Number of rows to fetch is a 4 byte integer after the ID
|
|
uint8_t buf[4];
|
|
gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET + MYSQL_PS_ID_SIZE, 4, buf);
|
|
m_expected_rows = gw_mysql_get_byte4(buf);
|
|
}
|
|
}
|
|
}
|
|
|
|
return mxs::Backend::write(buffer, type);
|
|
}
|
|
|
|
void RWBackend::close(close_type type)
|
|
{
|
|
m_reply_state = REPLY_STATE_DONE;
|
|
mxs::Backend::close(type);
|
|
}
|
|
|
|
bool RWBackend::consume_fetched_rows(GWBUF* buffer)
|
|
{
|
|
bool rval = false;
|
|
bool more = false;
|
|
int n_eof = modutil_count_signal_packets(buffer, 0, &more, &m_modutil_state);
|
|
|
|
// If the server responded with an error, n_eof > 0
|
|
if (n_eof > 0)
|
|
{
|
|
rval = true;
|
|
}
|
|
else
|
|
{
|
|
m_expected_rows -= modutil_count_packets(buffer);
|
|
mxb_assert(m_expected_rows >= 0);
|
|
rval = m_expected_rows == 0;
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
static inline bool have_next_packet(GWBUF* buffer)
|
|
{
|
|
uint32_t len = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer)) + MYSQL_HEADER_LEN;
|
|
return gwbuf_length(buffer) > len;
|
|
}
|
|
|
|
uint64_t get_encoded_int(Iter it)
|
|
{
|
|
uint64_t len = *it++;
|
|
|
|
switch (len)
|
|
{
|
|
case 0xfc:
|
|
len = *it++;
|
|
len |= ((uint64_t)*it++) << 8;
|
|
break;
|
|
|
|
case 0xfd:
|
|
len = *it++;
|
|
len |= ((uint64_t)*it++) << 8;
|
|
len |= ((uint64_t)*it++) << 16;
|
|
break;
|
|
|
|
case 0xfe:
|
|
len = *it++;
|
|
len |= ((uint64_t)*it++) << 8;
|
|
len |= ((uint64_t)*it++) << 16;
|
|
len |= ((uint64_t)*it++) << 24;
|
|
len |= ((uint64_t)*it++) << 32;
|
|
len |= ((uint64_t)*it++) << 40;
|
|
len |= ((uint64_t)*it++) << 48;
|
|
len |= ((uint64_t)*it++) << 56;
|
|
break;
|
|
|
|
default:
|
|
break;
|
|
}
|
|
|
|
return len;
|
|
}
|
|
|
|
Iter skip_encoded_int(Iter it)
|
|
{
|
|
switch (*it)
|
|
{
|
|
case 0xfc:
|
|
std::advance(it, 3);
|
|
break;
|
|
|
|
case 0xfd:
|
|
std::advance(it, 4);
|
|
break;
|
|
|
|
case 0xfe:
|
|
std::advance(it, 9);
|
|
break;
|
|
|
|
default:
|
|
std::advance(it, 1);
|
|
break;
|
|
}
|
|
|
|
return it;
|
|
}
|
|
|
|
bool is_last_ok(Iter it)
|
|
{
|
|
++it; // Skip the command byte
|
|
it = skip_encoded_int(it); // Affected rows
|
|
it = skip_encoded_int(it); // Last insert ID
|
|
uint16_t status = *it++;
|
|
status |= (*it++) << 8;
|
|
return (status & SERVER_MORE_RESULTS_EXIST) == 0;
|
|
}
|
|
|
|
bool is_last_eof(Iter it)
|
|
{
|
|
std::advance(it, 3); // Skip the command byte and warning count
|
|
uint16_t status = *it++;
|
|
status |= (*it++) << 8;
|
|
return (status & SERVER_MORE_RESULTS_EXIST) == 0;
|
|
}
|
|
|
|
void RWBackend::process_reply_start(Iter it)
|
|
{
|
|
uint8_t cmd = *it;
|
|
m_local_infile_requested = false;
|
|
|
|
switch (cmd)
|
|
{
|
|
case MYSQL_REPLY_OK:
|
|
if (is_last_ok(it))
|
|
{
|
|
// No more results
|
|
set_reply_state(REPLY_STATE_DONE);
|
|
}
|
|
break;
|
|
|
|
case MYSQL_REPLY_LOCAL_INFILE:
|
|
m_local_infile_requested = true;
|
|
set_reply_state(REPLY_STATE_DONE);
|
|
break;
|
|
|
|
case MYSQL_REPLY_ERR:
|
|
// Nothing ever follows an error packet
|
|
set_reply_state(REPLY_STATE_DONE);
|
|
break;
|
|
|
|
case MYSQL_REPLY_EOF:
|
|
// EOF packets are never expected as the first response
|
|
mxb_assert(!true);
|
|
break;
|
|
|
|
default:
|
|
if (current_command() == MXS_COM_FIELD_LIST)
|
|
{
|
|
// COM_FIELD_LIST sends a strange kind of a result set
|
|
set_reply_state(REPLY_STATE_RSET_ROWS);
|
|
}
|
|
else
|
|
{
|
|
// Start of a result set
|
|
m_num_coldefs = get_encoded_int(it);
|
|
set_reply_state(REPLY_STATE_RSET_COLDEF);
|
|
}
|
|
|
|
break;
|
|
}
|
|
}
|
|
|
|
void RWBackend::process_packets(GWBUF* result)
|
|
{
|
|
mxs::Buffer buffer(result);
|
|
auto it = buffer.begin();
|
|
|
|
while (it != buffer.end())
|
|
{
|
|
// Extract packet length and command byte
|
|
uint32_t len = *it++;
|
|
len |= (*it++) << 8;
|
|
len |= (*it++) << 16;
|
|
++it; // Skip the sequence
|
|
mxb_assert(it != buffer.end());
|
|
auto end = std::next(it, len);
|
|
uint8_t cmd = *it;
|
|
|
|
switch (m_reply_state)
|
|
{
|
|
case REPLY_STATE_START:
|
|
process_reply_start(it);
|
|
break;
|
|
|
|
case REPLY_STATE_DONE:
|
|
// This should never happen
|
|
MXS_ERROR("Unexpected result state. cmd: 0x%02hhx, len: %u", cmd, len);
|
|
mxb_assert(!true);
|
|
break;
|
|
|
|
case REPLY_STATE_RSET_COLDEF:
|
|
mxb_assert(m_num_coldefs > 0);
|
|
--m_num_coldefs;
|
|
|
|
if (m_num_coldefs == 0)
|
|
{
|
|
set_reply_state(REPLY_STATE_RSET_COLDEF_EOF);
|
|
// Skip this state when DEPRECATE_EOF capability is supported
|
|
}
|
|
break;
|
|
|
|
case REPLY_STATE_RSET_COLDEF_EOF:
|
|
mxb_assert(cmd == MYSQL_REPLY_EOF && len == MYSQL_EOF_PACKET_LEN - MYSQL_HEADER_LEN);
|
|
set_reply_state(REPLY_STATE_RSET_ROWS);
|
|
|
|
if (is_opening_cursor())
|
|
{
|
|
set_cursor_opened();
|
|
MXS_INFO("Cursor successfully opened");
|
|
set_reply_state(REPLY_STATE_DONE);
|
|
}
|
|
break;
|
|
|
|
case REPLY_STATE_RSET_ROWS:
|
|
if (cmd == MYSQL_REPLY_EOF && len == MYSQL_EOF_PACKET_LEN - MYSQL_HEADER_LEN)
|
|
{
|
|
set_reply_state(is_last_eof(it) ? REPLY_STATE_DONE : REPLY_STATE_START);
|
|
}
|
|
else if (cmd == MYSQL_REPLY_ERR)
|
|
{
|
|
set_reply_state(REPLY_STATE_DONE);
|
|
}
|
|
break;
|
|
}
|
|
|
|
it = end;
|
|
}
|
|
|
|
buffer.release();
|
|
}
|
|
|
|
/**
|
|
* @brief Process a possibly partial response from the backend
|
|
*
|
|
* @param buffer Buffer containing the response
|
|
*/
|
|
void RWBackend::process_reply(GWBUF* buffer)
|
|
{
|
|
if (current_command() == MXS_COM_STMT_FETCH)
|
|
{
|
|
// If the server responded with an error, n_eof > 0
|
|
if (consume_fetched_rows(buffer))
|
|
{
|
|
set_reply_state(REPLY_STATE_DONE);
|
|
}
|
|
}
|
|
else if (current_command() == MXS_COM_STATISTICS || GWBUF_IS_COLLECTED_RESULT(buffer))
|
|
{
|
|
// COM_STATISTICS returns a single string and thus requires special handling.
|
|
// Collected result are all in one buffer and need no processing.
|
|
set_reply_state(REPLY_STATE_DONE);
|
|
}
|
|
else
|
|
{
|
|
// Normal result, process it one packet at a time
|
|
process_packets(buffer);
|
|
}
|
|
|
|
if (get_reply_state() == REPLY_STATE_DONE)
|
|
{
|
|
ack_write();
|
|
}
|
|
}
|
|
|
|
ResponseStat& RWBackend::response_stat()
|
|
{
|
|
return m_response_stat;
|
|
}
|
|
|
|
void RWBackend::change_rlag_state(SERVER::RLagState new_state, int max_rlag)
|
|
{
|
|
mxb_assert(new_state == SERVER::RLagState::BELOW_LIMIT || new_state == SERVER::RLagState::ABOVE_LIMIT);
|
|
namespace atom = maxbase::atomic;
|
|
auto srv = server();
|
|
auto old_state = atom::load(&srv->rlag_state, atom::RELAXED);
|
|
if (new_state != old_state)
|
|
{
|
|
atom::store(&srv->rlag_state, new_state, atom::RELAXED);
|
|
// State has just changed, log warning. Don't log catchup if old state was RLAG_NONE.
|
|
if (new_state == SERVER::RLagState::ABOVE_LIMIT)
|
|
{
|
|
MXS_WARNING("Replication lag of '%s' is %is, which is above the configured limit %is. "
|
|
"'%s' is excluded from query routing.",
|
|
srv->name(), srv->rlag, max_rlag, srv->name());
|
|
}
|
|
else if (old_state == SERVER::RLagState::ABOVE_LIMIT)
|
|
{
|
|
MXS_WARNING("Replication lag of '%s' is %is, which is below the configured limit %is. "
|
|
"'%s' is returned to query routing.",
|
|
srv->name(), srv->rlag, max_rlag, srv->name());
|
|
}
|
|
}
|
|
}
|
|
|
|
mxs::SRWBackends RWBackend::from_servers(SERVER_REF* servers)
|
|
{
|
|
SRWBackends backends;
|
|
|
|
for (SERVER_REF* ref = servers; ref; ref = ref->next)
|
|
{
|
|
if (ref->active)
|
|
{
|
|
backends.emplace_back(new mxs::RWBackend(ref));
|
|
}
|
|
}
|
|
|
|
return backends;
|
|
}
|
|
}
|