
This is essentially just a search and replace to change SRWBackend to RWBackend* and SRWBackendList to PRWBackends, a vector of a raw pointers. In the next few commits vector<unique_ptr<RWBackend>> will be used for life time management. There are a lot of diffs from the global search and replace. Only a few manual edits had to be done. list-src -x build | xargs sed -ri 's/SRWBackends/prwbackends/g' list-src -x build | xargs sed -ri 's/const mxs::SRWBackend\&/const mxs::RWBackend\*/g' list-src -x build | xargs sed -ri 's/const SRWBackend\&/const RWBackend\*/g' list-src -x build | xargs sed -ri 's/mxs::SRWBackend\&/mxs::RWBackend\*/g' list-src -x build | xargs sed -ri 's/mxs::SRWBackend/mxs::RWBackend\*/g' list-src -x build | xargs sed -ri 's/SRWBackend\(\)/nullptr/g' list-src -x build | xargs sed -ri 's/mxs::SRWBackend\&/mxs::RWBackend\*/g' list-src -x build | xargs sed -ri 's/mxs::SRWBackend/mxs::RWBackend\*/g' list-src -x build | xargs sed -ri 's/SRWBackend\&/RWBackend\*/g' list-src -x build | xargs sed -ri 's/SRWBackend\b/RWBackend\*/g' list-src -x build | xargs sed -ri 's/prwbackends/PRWBackends/g'
160 lines
5.3 KiB
C++
160 lines
5.3 KiB
C++
/*
|
|
* Copyright (c) 2016 MariaDB Corporation Ab
|
|
*
|
|
* Use of this software is governed by the Business Source License included
|
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
|
*
|
|
* Change Date: 2022-01-01
|
|
*
|
|
* On the date above, in accordance with the Business Source License, use
|
|
* of this software will be governed by version 2 or later of the General
|
|
* Public License.
|
|
*/
|
|
|
|
#include "readwritesplit.hh"
|
|
#include "rwsplitsession.hh"
|
|
|
|
#include <stdio.h>
|
|
#include <strings.h>
|
|
#include <string.h>
|
|
#include <stdlib.h>
|
|
#include <stdint.h>
|
|
|
|
#include <maxscale/router.hh>
|
|
|
|
using namespace maxscale;
|
|
|
|
/**
|
|
* Functions for session command handling
|
|
*/
|
|
|
|
|
|
static std::string extract_error(GWBUF* buffer)
|
|
{
|
|
std::string rval;
|
|
|
|
if (MYSQL_IS_ERROR_PACKET(((uint8_t*)GWBUF_DATA(buffer))))
|
|
{
|
|
size_t replylen = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer));
|
|
char replybuf[replylen];
|
|
gwbuf_copy_data(buffer, 0, sizeof(replybuf), (uint8_t*)replybuf);
|
|
std::string err;
|
|
std::string msg;
|
|
err.append(replybuf + 8, 5);
|
|
msg.append(replybuf + 13, replylen - 4 - 5);
|
|
rval = err + ": " + msg;
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
/**
|
|
* Discards the slave connection if its response differs from the master's response
|
|
*
|
|
* @param backend The slave Backend
|
|
* @param master_cmd Master's reply
|
|
* @param slave_cmd Slave's reply
|
|
*/
|
|
static void discard_if_response_differs(RWBackend* backend,
|
|
uint8_t master_response,
|
|
uint8_t slave_response,
|
|
SSessionCommand sescmd)
|
|
{
|
|
if (master_response != slave_response)
|
|
{
|
|
uint8_t cmd = sescmd->get_command();
|
|
std::string query = sescmd->to_string();
|
|
MXS_WARNING("Slave server '%s': response (0x%02hhx) differs "
|
|
"from master's response (0x%02hhx) to %s: `%s`. "
|
|
"Closing slave connection due to inconsistent session state.",
|
|
backend->name(),
|
|
slave_response,
|
|
master_response,
|
|
STRPACKETTYPE(cmd),
|
|
query.empty() ? "<no query>" : query.c_str());
|
|
backend->close(mxs::Backend::CLOSE_FATAL);
|
|
}
|
|
}
|
|
|
|
void RWSplitSession::process_sescmd_response(RWBackend* backend, GWBUF** ppPacket)
|
|
{
|
|
if (backend->has_session_commands())
|
|
{
|
|
mxb_assert(GWBUF_IS_COLLECTED_RESULT(*ppPacket));
|
|
uint8_t cmd;
|
|
gwbuf_copy_data(*ppPacket, MYSQL_HEADER_LEN, 1, &cmd);
|
|
uint8_t command = backend->next_session_command()->get_command();
|
|
mxs::SSessionCommand sescmd = backend->next_session_command();
|
|
uint64_t id = backend->complete_session_command();
|
|
MXS_PS_RESPONSE resp = {};
|
|
bool discard = true;
|
|
|
|
if (command == MXS_COM_STMT_PREPARE && cmd != MYSQL_REPLY_ERR)
|
|
{
|
|
// This should never fail or the backend protocol is broken
|
|
MXB_AT_DEBUG(bool b = ) mxs_mysql_extract_ps_response(*ppPacket, &resp);
|
|
mxb_assert(b);
|
|
backend->add_ps_handle(id, resp.id);
|
|
}
|
|
|
|
if (m_recv_sescmd < m_sent_sescmd && id == m_recv_sescmd + 1)
|
|
{
|
|
if (!m_current_master || !m_current_master->in_use()// Session doesn't have a master
|
|
|| m_current_master == backend) // This is the master's response
|
|
{
|
|
/** First reply to this session command, route it to the client */
|
|
++m_recv_sescmd;
|
|
discard = false;
|
|
|
|
/** Store the master's response so that the slave responses can
|
|
* be compared to it */
|
|
m_sescmd_responses[id] = cmd;
|
|
|
|
if (cmd == MYSQL_REPLY_ERR)
|
|
{
|
|
MXS_INFO("Session command no. %lu failed: %s",
|
|
id,
|
|
extract_error(*ppPacket).c_str());
|
|
}
|
|
else if (command == MXS_COM_STMT_PREPARE)
|
|
{
|
|
/** Map the returned response to the internal ID */
|
|
MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id);
|
|
m_qc.ps_id_internal_put(resp.id, id);
|
|
}
|
|
|
|
// Discard any slave connections that did not return the same result
|
|
for (SlaveResponseList::iterator it = m_slave_responses.begin();
|
|
it != m_slave_responses.end(); it++)
|
|
{
|
|
discard_if_response_differs(it->first, cmd, it->second, sescmd);
|
|
}
|
|
|
|
m_slave_responses.clear();
|
|
}
|
|
else
|
|
{
|
|
/** Record slave command so that the response can be validated
|
|
* against the master's response when it arrives. */
|
|
m_slave_responses.push_back(std::make_pair(backend, cmd));
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (cmd == MYSQL_REPLY_ERR && m_sescmd_responses[id] != MYSQL_REPLY_ERR)
|
|
{
|
|
MXS_INFO("Session command failed on slave '%s': %s",
|
|
backend->name(), extract_error(*ppPacket).c_str());
|
|
}
|
|
|
|
discard_if_response_differs(backend, m_sescmd_responses[id], cmd, sescmd);
|
|
}
|
|
|
|
if (discard)
|
|
{
|
|
gwbuf_free(*ppPacket);
|
|
*ppPacket = NULL;
|
|
}
|
|
}
|
|
}
|