500 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			500 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
 * Copyright (c) 2016 MariaDB Corporation Ab
 | 
						|
 *
 | 
						|
 * Use of this software is governed by the Business Source License included
 | 
						|
 * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | 
						|
 *
 | 
						|
 * Change Date: 2024-10-14
 | 
						|
 *
 | 
						|
 * On the date above, in accordance with the Business Source License, use
 | 
						|
 * of this software will be governed by version 2 or later of the General
 | 
						|
 * Public License.
 | 
						|
 */
 | 
						|
 | 
						|
#include <maxscale/protocol/rwbackend.hh>
 | 
						|
 | 
						|
#include <maxscale/modutil.hh>
 | 
						|
#include <maxscale/protocol/mysql.hh>
 | 
						|
 | 
						|
using Iter = mxs::Buffer::iterator;
 | 
						|
 | 
						|
namespace maxscale
 | 
						|
{
 | 
						|
 | 
						|
RWBackend::RWBackend(SERVER_REF* ref)
 | 
						|
    : mxs::Backend(ref)
 | 
						|
    , m_reply_state(REPLY_STATE_DONE)
 | 
						|
    , m_modutil_state{0}
 | 
						|
    , m_command(0)
 | 
						|
    , m_opening_cursor(false)
 | 
						|
    , m_expected_rows(0)
 | 
						|
    , m_local_infile_requested(false)
 | 
						|
{
 | 
						|
}
 | 
						|
 | 
						|
RWBackend::~RWBackend()
 | 
						|
{
 | 
						|
}
 | 
						|
 | 
						|
bool RWBackend::execute_session_command()
 | 
						|
{
 | 
						|
    m_command = next_session_command()->get_command();
 | 
						|
    bool expect_response = mxs_mysql_command_will_respond(m_command);
 | 
						|
    bool rval = mxs::Backend::execute_session_command();
 | 
						|
 | 
						|
    if (rval && expect_response)
 | 
						|
    {
 | 
						|
        set_reply_state(REPLY_STATE_START);
 | 
						|
        m_size = 0;
 | 
						|
    }
 | 
						|
 | 
						|
    return rval;
 | 
						|
}
 | 
						|
 | 
						|
bool RWBackend::continue_session_command(GWBUF* buffer)
 | 
						|
{
 | 
						|
    return Backend::write(buffer, NO_RESPONSE);
 | 
						|
}
 | 
						|
 | 
						|
void RWBackend::add_ps_handle(uint32_t id, uint32_t handle)
 | 
						|
{
 | 
						|
    m_ps_handles[id] = handle;
 | 
						|
    MXS_INFO("PS response for %s: %u -> %u", name(), id, handle);
 | 
						|
}
 | 
						|
 | 
						|
uint32_t RWBackend::get_ps_handle(uint32_t id) const
 | 
						|
{
 | 
						|
    BackendHandleMap::const_iterator it = m_ps_handles.find(id);
 | 
						|
 | 
						|
    if (it != m_ps_handles.end())
 | 
						|
    {
 | 
						|
        return it->second;
 | 
						|
    }
 | 
						|
 | 
						|
    return 0;
 | 
						|
}
 | 
						|
 | 
						|
bool RWBackend::write(GWBUF* buffer, response_type type)
 | 
						|
{
 | 
						|
    uint32_t len = mxs_mysql_get_packet_len(buffer);
 | 
						|
    bool was_large_query = m_large_query;
 | 
						|
    m_large_query = len == MYSQL_PACKET_LENGTH_MAX;
 | 
						|
 | 
						|
    if (was_large_query)
 | 
						|
    {
 | 
						|
        return mxs::Backend::write(buffer, Backend::NO_RESPONSE);
 | 
						|
    }
 | 
						|
 | 
						|
    if (type == mxs::Backend::EXPECT_RESPONSE)
 | 
						|
    {
 | 
						|
        /** The server will reply to this command */
 | 
						|
        set_reply_state(REPLY_STATE_START);
 | 
						|
        m_size = 0;
 | 
						|
    }
 | 
						|
 | 
						|
    uint8_t cmd = mxs_mysql_get_command(buffer);
 | 
						|
 | 
						|
    m_command = cmd;
 | 
						|
 | 
						|
    if (mxs_mysql_is_ps_command(cmd))
 | 
						|
    {
 | 
						|
        // We need to completely separate the buffer this backend owns and the one that the caller owns to
 | 
						|
        // prevent any modifications from affecting the one that was written through this backend. If the
 | 
						|
        // buffer gets placed into the write queue of the DCB, subsequent modifications to the original buffer
 | 
						|
        // would be propagated to the one this backend owns.
 | 
						|
        GWBUF* tmp = gwbuf_deep_clone(buffer);
 | 
						|
        gwbuf_free(buffer);
 | 
						|
        buffer = tmp;
 | 
						|
 | 
						|
        uint32_t id = mxs_mysql_extract_ps_id(buffer);
 | 
						|
        BackendHandleMap::iterator it = m_ps_handles.find(id);
 | 
						|
 | 
						|
        if (it != m_ps_handles.end())
 | 
						|
        {
 | 
						|
            /** Replace the client handle with the real PS handle */
 | 
						|
            uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
 | 
						|
            gw_mysql_set_byte4(ptr, it->second);
 | 
						|
 | 
						|
            if (cmd == MXS_COM_STMT_EXECUTE)
 | 
						|
            {
 | 
						|
                // Extract the flag byte after the statement ID
 | 
						|
                uint8_t flags = 0;
 | 
						|
                gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET + MYSQL_PS_ID_SIZE, 1, &flags);
 | 
						|
 | 
						|
                // Any non-zero flag value means that we have an open cursor
 | 
						|
                m_opening_cursor = flags != 0;
 | 
						|
            }
 | 
						|
            else if (cmd == MXS_COM_STMT_CLOSE)
 | 
						|
            {
 | 
						|
                m_ps_handles.erase(it);
 | 
						|
            }
 | 
						|
            else if (cmd == MXS_COM_STMT_FETCH)
 | 
						|
            {
 | 
						|
                // Number of rows to fetch is a 4 byte integer after the ID
 | 
						|
                uint8_t buf[4];
 | 
						|
                gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET + MYSQL_PS_ID_SIZE, 4, buf);
 | 
						|
                m_expected_rows = gw_mysql_get_byte4(buf);
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return mxs::Backend::write(buffer, type);
 | 
						|
}
 | 
						|
 | 
						|
void RWBackend::close(close_type type)
 | 
						|
{
 | 
						|
    m_reply_state = REPLY_STATE_DONE;
 | 
						|
    mxs::Backend::close(type);
 | 
						|
}
 | 
						|
 | 
						|
bool RWBackend::consume_fetched_rows(GWBUF* buffer)
 | 
						|
{
 | 
						|
    bool rval = false;
 | 
						|
    bool more = false;
 | 
						|
    int n_eof = modutil_count_signal_packets(buffer, 0, &more, &m_modutil_state);
 | 
						|
 | 
						|
    // If the server responded with an error, n_eof > 0
 | 
						|
    if (n_eof > 0)
 | 
						|
    {
 | 
						|
        rval = true;
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        m_expected_rows -= modutil_count_packets(buffer);
 | 
						|
        mxb_assert(m_expected_rows >= 0);
 | 
						|
        rval = m_expected_rows == 0;
 | 
						|
    }
 | 
						|
 | 
						|
    return rval;
 | 
						|
}
 | 
						|
 | 
						|
static inline bool have_next_packet(GWBUF* buffer)
 | 
						|
{
 | 
						|
    uint32_t len = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer)) + MYSQL_HEADER_LEN;
 | 
						|
    return gwbuf_length(buffer) > len;
 | 
						|
}
 | 
						|
 | 
						|
uint64_t get_encoded_int(Iter it)
 | 
						|
{
 | 
						|
    uint64_t len = *it++;
 | 
						|
 | 
						|
    switch (len)
 | 
						|
    {
 | 
						|
    case 0xfc:
 | 
						|
        len = *it++;
 | 
						|
        len |= ((uint64_t)*it++) << 8;
 | 
						|
        break;
 | 
						|
 | 
						|
    case 0xfd:
 | 
						|
        len = *it++;
 | 
						|
        len |= ((uint64_t)*it++) << 8;
 | 
						|
        len |= ((uint64_t)*it++) << 16;
 | 
						|
        break;
 | 
						|
 | 
						|
    case 0xfe:
 | 
						|
        len = *it++;
 | 
						|
        len |= ((uint64_t)*it++) << 8;
 | 
						|
        len |= ((uint64_t)*it++) << 16;
 | 
						|
        len |= ((uint64_t)*it++) << 24;
 | 
						|
        len |= ((uint64_t)*it++) << 32;
 | 
						|
        len |= ((uint64_t)*it++) << 40;
 | 
						|
        len |= ((uint64_t)*it++) << 48;
 | 
						|
        len |= ((uint64_t)*it++) << 56;
 | 
						|
        break;
 | 
						|
 | 
						|
    default:
 | 
						|
        break;
 | 
						|
    }
 | 
						|
 | 
						|
    return len;
 | 
						|
}
 | 
						|
 | 
						|
Iter skip_encoded_int(Iter it)
 | 
						|
{
 | 
						|
    switch (*it)
 | 
						|
    {
 | 
						|
    case 0xfc:
 | 
						|
        it.advance(3);
 | 
						|
        break;
 | 
						|
 | 
						|
    case 0xfd:
 | 
						|
        it.advance(4);
 | 
						|
        break;
 | 
						|
 | 
						|
    case 0xfe:
 | 
						|
        it.advance(9);
 | 
						|
        break;
 | 
						|
 | 
						|
    default:
 | 
						|
        ++it;
 | 
						|
        break;
 | 
						|
    }
 | 
						|
 | 
						|
    return it;
 | 
						|
}
 | 
						|
 | 
						|
bool is_last_ok(Iter it)
 | 
						|
{
 | 
						|
    ++it;                       // Skip the command byte
 | 
						|
    it = skip_encoded_int(it);  // Affected rows
 | 
						|
    it = skip_encoded_int(it);  // Last insert ID
 | 
						|
    uint16_t status = *it++;
 | 
						|
    status |= (*it++) << 8;
 | 
						|
    return (status & SERVER_MORE_RESULTS_EXIST) == 0;
 | 
						|
}
 | 
						|
 | 
						|
bool is_last_eof(Iter it)
 | 
						|
{
 | 
						|
    std::advance(it, 3);    // Skip the command byte and warning count
 | 
						|
    uint16_t status = *it++;
 | 
						|
    status |= (*it++) << 8;
 | 
						|
    return (status & SERVER_MORE_RESULTS_EXIST) == 0;
 | 
						|
}
 | 
						|
 | 
						|
void RWBackend::process_reply_start(Iter it, Iter end)
 | 
						|
{
 | 
						|
    uint8_t cmd = *it;
 | 
						|
    m_local_infile_requested = false;
 | 
						|
 | 
						|
    switch (cmd)
 | 
						|
    {
 | 
						|
    case MYSQL_REPLY_OK:
 | 
						|
        if (is_last_ok(it))
 | 
						|
        {
 | 
						|
            // No more results
 | 
						|
            set_reply_state(REPLY_STATE_DONE);
 | 
						|
        }
 | 
						|
        break;
 | 
						|
 | 
						|
    case MYSQL_REPLY_LOCAL_INFILE:
 | 
						|
        m_local_infile_requested = true;
 | 
						|
        set_reply_state(REPLY_STATE_DONE);
 | 
						|
        break;
 | 
						|
 | 
						|
    case MYSQL_REPLY_ERR:
 | 
						|
        // Nothing ever follows an error packet
 | 
						|
        ++it;
 | 
						|
        update_error(it, end);
 | 
						|
        set_reply_state(REPLY_STATE_DONE);
 | 
						|
        break;
 | 
						|
 | 
						|
    case MYSQL_REPLY_EOF:
 | 
						|
        // EOF packets are never expected as the first response
 | 
						|
        mxb_assert(!true);
 | 
						|
        break;
 | 
						|
 | 
						|
    default:
 | 
						|
        if (current_command() == MXS_COM_FIELD_LIST)
 | 
						|
        {
 | 
						|
            // COM_FIELD_LIST sends a strange kind of a result set
 | 
						|
            set_reply_state(REPLY_STATE_RSET_ROWS);
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            // Start of a result set
 | 
						|
            m_num_coldefs = get_encoded_int(it);
 | 
						|
            set_reply_state(REPLY_STATE_RSET_COLDEF);
 | 
						|
        }
 | 
						|
 | 
						|
        break;
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
void RWBackend::process_packets(GWBUF* result)
 | 
						|
{
 | 
						|
    mxs::Buffer buffer(result);
 | 
						|
    auto it = buffer.begin();
 | 
						|
    MXB_AT_DEBUG(size_t total_len = buffer.length());
 | 
						|
    MXB_AT_DEBUG(size_t used_len = 0);
 | 
						|
    mxb_assert(dcb()->session->service->capabilities & (RCAP_TYPE_PACKET_OUTPUT | RCAP_TYPE_STMT_OUTPUT));
 | 
						|
 | 
						|
    while (it != buffer.end())
 | 
						|
    {
 | 
						|
        // Extract packet length and command byte
 | 
						|
        uint32_t len = *it++;
 | 
						|
        len |= (*it++) << 8;
 | 
						|
        len |= (*it++) << 16;
 | 
						|
        ++it;   // Skip the sequence
 | 
						|
        mxb_assert(it != buffer.end());
 | 
						|
        mxb_assert(used_len + len <= total_len);
 | 
						|
        MXB_AT_DEBUG(used_len += len);
 | 
						|
        auto end = it;
 | 
						|
        end.advance(len);
 | 
						|
        uint8_t cmd = *it;
 | 
						|
 | 
						|
        m_size += len;
 | 
						|
 | 
						|
        // Ignore the tail end of a large packet large packet. Only resultsets can generate packets this large
 | 
						|
        // and we don't care what the contents are and thus it is safe to ignore it.
 | 
						|
        bool skip_next = m_skip_next;
 | 
						|
        m_skip_next = len == GW_MYSQL_MAX_PACKET_LEN;
 | 
						|
 | 
						|
        if (skip_next)
 | 
						|
        {
 | 
						|
            it = end;
 | 
						|
            continue;
 | 
						|
        }
 | 
						|
 | 
						|
        switch (m_reply_state)
 | 
						|
        {
 | 
						|
        case REPLY_STATE_START:
 | 
						|
            process_reply_start(it, end);
 | 
						|
            break;
 | 
						|
 | 
						|
        case REPLY_STATE_DONE:
 | 
						|
            if (cmd == MYSQL_REPLY_ERR)
 | 
						|
            {
 | 
						|
                update_error(++it, end);
 | 
						|
            }
 | 
						|
            else
 | 
						|
            {
 | 
						|
                // This should never happen
 | 
						|
                MXS_ERROR("Unexpected result state. cmd: 0x%02hhx, len: %u", cmd, len);
 | 
						|
                mxb_assert(!true);
 | 
						|
            }
 | 
						|
            break;
 | 
						|
 | 
						|
        case REPLY_STATE_RSET_COLDEF:
 | 
						|
            mxb_assert(m_num_coldefs > 0);
 | 
						|
            --m_num_coldefs;
 | 
						|
 | 
						|
            if (m_num_coldefs == 0)
 | 
						|
            {
 | 
						|
                set_reply_state(REPLY_STATE_RSET_COLDEF_EOF);
 | 
						|
                // Skip this state when DEPRECATE_EOF capability is supported
 | 
						|
            }
 | 
						|
            break;
 | 
						|
 | 
						|
        case REPLY_STATE_RSET_COLDEF_EOF:
 | 
						|
            mxb_assert(cmd == MYSQL_REPLY_EOF && len == MYSQL_EOF_PACKET_LEN - MYSQL_HEADER_LEN);
 | 
						|
            set_reply_state(REPLY_STATE_RSET_ROWS);
 | 
						|
 | 
						|
            if (is_opening_cursor())
 | 
						|
            {
 | 
						|
                set_cursor_opened();
 | 
						|
                MXS_INFO("Cursor successfully opened");
 | 
						|
                set_reply_state(REPLY_STATE_DONE);
 | 
						|
            }
 | 
						|
            break;
 | 
						|
 | 
						|
        case REPLY_STATE_RSET_ROWS:
 | 
						|
            if (cmd == MYSQL_REPLY_EOF && len == MYSQL_EOF_PACKET_LEN - MYSQL_HEADER_LEN)
 | 
						|
            {
 | 
						|
                set_reply_state(is_last_eof(it) ? REPLY_STATE_DONE : REPLY_STATE_START);
 | 
						|
            }
 | 
						|
            else if (cmd == MYSQL_REPLY_ERR)
 | 
						|
            {
 | 
						|
                ++it;
 | 
						|
                update_error(it, end);
 | 
						|
                set_reply_state(REPLY_STATE_DONE);
 | 
						|
            }
 | 
						|
            break;
 | 
						|
        }
 | 
						|
 | 
						|
        it = end;
 | 
						|
    }
 | 
						|
 | 
						|
    buffer.release();
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Process a possibly partial response from the backend
 | 
						|
 *
 | 
						|
 * @param buffer  Buffer containing the response
 | 
						|
 */
 | 
						|
void RWBackend::process_reply(GWBUF* buffer)
 | 
						|
{
 | 
						|
    m_error.clear();
 | 
						|
 | 
						|
    if (current_command() == MXS_COM_BINLOG_DUMP)
 | 
						|
    {
 | 
						|
        // Treat COM_BINLOG_DUMP like a response that never ends
 | 
						|
    }
 | 
						|
    else if (current_command() == MXS_COM_STMT_FETCH)
 | 
						|
    {
 | 
						|
        // TODO: m_error is not updated here.
 | 
						|
        // If the server responded with an error, n_eof > 0
 | 
						|
        if (consume_fetched_rows(buffer))
 | 
						|
        {
 | 
						|
            set_reply_state(REPLY_STATE_DONE);
 | 
						|
        }
 | 
						|
    }
 | 
						|
    else if (current_command() == MXS_COM_STATISTICS || GWBUF_IS_COLLECTED_RESULT(buffer))
 | 
						|
    {
 | 
						|
        // COM_STATISTICS returns a single string and thus requires special handling.
 | 
						|
        // Collected result are all in one buffer and need no processing.
 | 
						|
        set_reply_state(REPLY_STATE_DONE);
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        // Normal result, process it one packet at a time
 | 
						|
        process_packets(buffer);
 | 
						|
    }
 | 
						|
 | 
						|
    if (get_reply_state() == REPLY_STATE_DONE && is_waiting_result())
 | 
						|
    {
 | 
						|
        ack_write();
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
ResponseStat& RWBackend::response_stat()
 | 
						|
{
 | 
						|
    return m_response_stat;
 | 
						|
}
 | 
						|
 | 
						|
void RWBackend::change_rlag_state(SERVER::RLagState new_state, int max_rlag)
 | 
						|
{
 | 
						|
    mxb_assert(new_state == SERVER::RLagState::BELOW_LIMIT || new_state == SERVER::RLagState::ABOVE_LIMIT);
 | 
						|
    namespace atom = maxbase::atomic;
 | 
						|
    auto srv = server();
 | 
						|
    auto old_state = atom::load(&srv->rlag_state, atom::RELAXED);
 | 
						|
    if (new_state != old_state)
 | 
						|
    {
 | 
						|
        atom::store(&srv->rlag_state, new_state, atom::RELAXED);
 | 
						|
        // State has just changed, log warning. Don't log catchup if old state was RLAG_NONE.
 | 
						|
        if (new_state == SERVER::RLagState::ABOVE_LIMIT)
 | 
						|
        {
 | 
						|
            MXS_WARNING("Replication lag of '%s' is %is, which is above the configured limit %is. "
 | 
						|
                        "'%s' is excluded from query routing.",
 | 
						|
                        srv->name(), srv->rlag, max_rlag, srv->name());
 | 
						|
        }
 | 
						|
        else if (old_state == SERVER::RLagState::ABOVE_LIMIT)
 | 
						|
        {
 | 
						|
            MXS_WARNING("Replication lag of '%s' is %is, which is below the configured limit %is. "
 | 
						|
                        "'%s' is returned to query routing.",
 | 
						|
                        srv->name(), srv->rlag, max_rlag, srv->name());
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
mxs::SRWBackends RWBackend::from_servers(SERVER_REF* servers)
 | 
						|
{
 | 
						|
    SRWBackends backends;
 | 
						|
 | 
						|
    for (SERVER_REF* ref = servers; ref; ref = ref->next)
 | 
						|
    {
 | 
						|
        if (ref->active)
 | 
						|
        {
 | 
						|
            backends.emplace_back(new mxs::RWBackend(ref));
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return backends;
 | 
						|
}
 | 
						|
 | 
						|
void RWBackend::update_error(Iter it, Iter end)
 | 
						|
{
 | 
						|
    uint16_t code = 0;
 | 
						|
    code |= (*it++);
 | 
						|
    code |= (*it++) << 8;
 | 
						|
    ++it;
 | 
						|
    auto sql_state_begin = it;
 | 
						|
    it.advance(5);
 | 
						|
    auto sql_state_end = it;
 | 
						|
    auto message_begin = sql_state_end;
 | 
						|
    auto message_end = end;
 | 
						|
 | 
						|
    m_error.set(code, sql_state_begin, sql_state_end, message_begin, message_end);
 | 
						|
}
 | 
						|
}
 |