MXS-2521: Detect COM_STMT_EXECUTE without metadata
If a COM_STMT_EXECUTE has no metadata in it and it has more than one parameter, it must be routed to the same backend where the previous COM_STMT_EXECUTE with the same ID was routed to. This prevents MDEV-19811 that is triggered by MaxScale routing the queries to different backends.
This commit is contained in:
@ -234,6 +234,17 @@ public:
|
|||||||
return qc_query_is_type(m_route_info.type_mask(), QUERY_TYPE_BEGIN_TRX);
|
return qc_query_is_type(m_route_info.type_mask(), QUERY_TYPE_BEGIN_TRX);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether the current binary protocol statement is a continuation of a previously executed statement.
|
||||||
|
*
|
||||||
|
* All COM_STMT_FETCH are continuations of a previously executed COM_STMT_EXECUTE. A COM_STMT_EXECUTE can
|
||||||
|
* be a continuation if it has parameters but it doesn't provide the metadata for them.
|
||||||
|
*/
|
||||||
|
bool is_ps_continuation() const
|
||||||
|
{
|
||||||
|
return m_ps_continuation;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Store and process a prepared statement
|
* @brief Store and process a prepared statement
|
||||||
*
|
*
|
||||||
@ -251,12 +262,16 @@ public:
|
|||||||
void ps_erase(GWBUF* buffer);
|
void ps_erase(GWBUF* buffer);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Store a mapping from an external id to the corresponding internal id
|
* @brief Store a prepared statement response
|
||||||
*
|
*
|
||||||
* @param external_id The external id as seen by the client.
|
* The response maps the internal ID to the external ID that is given to the client. It also collects
|
||||||
* @param internal_id The corresponding internal id.
|
* the number of parameters in the prepared statement which are required in some cases in the routing
|
||||||
|
* process.
|
||||||
|
*
|
||||||
|
* @param internal_id The internal id (i.e. the session command number)
|
||||||
|
* @param buffer The buffer containing the OK response to a COM_STMT_PREPARE
|
||||||
*/
|
*/
|
||||||
void ps_id_internal_put(uint32_t external_id, uint32_t internal_id);
|
void ps_store_response(uint32_t internal_id, GWBUF* buffer);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Update the current RouteInfo.
|
* @brief Update the current RouteInfo.
|
||||||
@ -373,6 +388,8 @@ private:
|
|||||||
uint8_t packet_type,
|
uint8_t packet_type,
|
||||||
uint32_t* qtype);
|
uint32_t* qtype);
|
||||||
|
|
||||||
|
bool query_continues_ps(uint8_t cmd, uint32_t stmt_id, GWBUF* buffer);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
class PSManager;
|
class PSManager;
|
||||||
typedef std::shared_ptr<PSManager> SPSManager;
|
typedef std::shared_ptr<PSManager> SPSManager;
|
||||||
@ -397,5 +414,6 @@ private:
|
|||||||
HandleMap m_ps_handles; /** External ID to internal ID */
|
HandleMap m_ps_handles; /** External ID to internal ID */
|
||||||
RouteInfo m_route_info;
|
RouteInfo m_route_info;
|
||||||
bool m_trx_is_read_only;
|
bool m_trx_is_read_only;
|
||||||
|
bool m_ps_continuation;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -75,6 +75,19 @@ uint32_t qc_mysql_extract_ps_id(GWBUF* buffer)
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint16_t qc_extract_ps_param_count(GWBUF* buffer)
|
||||||
|
{
|
||||||
|
uint16_t rval = 0;
|
||||||
|
uint8_t params[MYSQL_PS_PARAMS_SIZE];
|
||||||
|
|
||||||
|
if (gwbuf_copy_data(buffer, MYSQL_PS_PARAMS_OFFSET, sizeof(params), params) == sizeof(params))
|
||||||
|
{
|
||||||
|
rval = gw_mysql_get_byte2(params);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
bool have_semicolon(const char* ptr, int len)
|
bool have_semicolon(const char* ptr, int len)
|
||||||
{
|
{
|
||||||
for (int i = 0; i < len; i++)
|
for (int i = 0; i < len; i++)
|
||||||
@ -265,7 +278,7 @@ public:
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case MXS_COM_STMT_PREPARE:
|
case MXS_COM_STMT_PREPARE:
|
||||||
m_binary_ps[id] = get_prepare_type(buffer);
|
m_binary_ps[id].type = get_prepare_type(buffer);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
@ -281,7 +294,7 @@ public:
|
|||||||
|
|
||||||
if (it != m_binary_ps.end())
|
if (it != m_binary_ps.end())
|
||||||
{
|
{
|
||||||
rval = it->second;
|
rval = it->second.type;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -342,8 +355,32 @@ public:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void set_param_count(uint32_t id, uint16_t param_count)
|
||||||
|
{
|
||||||
|
m_binary_ps[id].param_count = param_count;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint16_t param_count(uint32_t id) const
|
||||||
|
{
|
||||||
|
uint16_t rval = 0;
|
||||||
|
auto it = m_binary_ps.find(id);
|
||||||
|
|
||||||
|
if (it != m_binary_ps.end())
|
||||||
|
{
|
||||||
|
rval = it->second.param_count;
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
typedef std::unordered_map<uint32_t, uint32_t> BinaryPSMap;
|
struct BinaryPS
|
||||||
|
{
|
||||||
|
uint32_t type = 0;
|
||||||
|
uint16_t param_count = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
typedef std::unordered_map<uint32_t, BinaryPS> BinaryPSMap;
|
||||||
typedef std::unordered_map<std::string, uint32_t> TextPSMap;
|
typedef std::unordered_map<std::string, uint32_t> TextPSMap;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -368,6 +405,7 @@ QueryClassifier::QueryClassifier(Handler* pHandler,
|
|||||||
, m_multi_statements_allowed(are_multi_statements_allowed(pSession))
|
, m_multi_statements_allowed(are_multi_statements_allowed(pSession))
|
||||||
, m_sPs_manager(new PSManager)
|
, m_sPs_manager(new PSManager)
|
||||||
, m_trx_is_read_only(true)
|
, m_trx_is_read_only(true)
|
||||||
|
, m_ps_continuation(false)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -622,9 +660,15 @@ uint32_t QueryClassifier::ps_id_internal_get(GWBUF* pBuffer)
|
|||||||
return internal_id;
|
return internal_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueryClassifier::ps_id_internal_put(uint32_t external_id, uint32_t internal_id)
|
void QueryClassifier::ps_store_response(uint32_t internal_id, GWBUF* buffer)
|
||||||
{
|
{
|
||||||
|
auto external_id = qc_mysql_extract_ps_id(buffer);
|
||||||
m_ps_handles[external_id] = internal_id;
|
m_ps_handles[external_id] = internal_id;
|
||||||
|
|
||||||
|
if (auto param_count = qc_extract_ps_param_count(buffer))
|
||||||
|
{
|
||||||
|
m_sPs_manager->set_param_count(internal_id, param_count);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueryClassifier::log_transaction_status(GWBUF* querybuf, uint32_t qtype)
|
void QueryClassifier::log_transaction_status(GWBUF* querybuf, uint32_t qtype)
|
||||||
@ -909,6 +953,38 @@ QueryClassifier::current_target_t QueryClassifier::handle_multi_temp_and_load(
|
|||||||
return rv;
|
return rv;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool QueryClassifier::query_continues_ps(uint8_t cmd, uint32_t stmt_id, GWBUF* buffer)
|
||||||
|
{
|
||||||
|
bool rval = false;
|
||||||
|
|
||||||
|
if (cmd == COM_STMT_FETCH)
|
||||||
|
{
|
||||||
|
// COM_STMT_FETCH should always go to the same target as the COM_STMT_EXECUTE
|
||||||
|
rval = true;
|
||||||
|
}
|
||||||
|
else if (cmd == MXS_COM_STMT_EXECUTE)
|
||||||
|
{
|
||||||
|
if (auto params = m_sPs_manager->param_count(stmt_id))
|
||||||
|
{
|
||||||
|
size_t types_offset = MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + ((params + 7) / 8);
|
||||||
|
uint8_t have_types = 0;
|
||||||
|
|
||||||
|
if (gwbuf_copy_data(buffer, types_offset, 1, &have_types))
|
||||||
|
{
|
||||||
|
if (have_types == 0)
|
||||||
|
{
|
||||||
|
// A previous COM_STMT_EXECUTE provided the field types, and this one relies on the
|
||||||
|
// previous one. This means that this query must be routed to the same server where the
|
||||||
|
// previous COM_STMT_EXECUTE was routed.
|
||||||
|
rval = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
QueryClassifier::RouteInfo QueryClassifier::update_route_info(
|
QueryClassifier::RouteInfo QueryClassifier::update_route_info(
|
||||||
QueryClassifier::current_target_t current_target,
|
QueryClassifier::current_target_t current_target,
|
||||||
GWBUF* pBuffer)
|
GWBUF* pBuffer)
|
||||||
@ -1005,6 +1081,7 @@ QueryClassifier::RouteInfo QueryClassifier::update_route_info(
|
|||||||
{
|
{
|
||||||
stmt_id = ps_id_internal_get(pBuffer);
|
stmt_id = ps_id_internal_get(pBuffer);
|
||||||
type_mask = ps_get_type(stmt_id);
|
type_mask = ps_get_type(stmt_id);
|
||||||
|
m_ps_continuation = query_continues_ps(command, stmt_id, pBuffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
route_target = get_route_target(command, type_mask);
|
route_target = get_route_target(command, type_mask);
|
||||||
|
@ -895,10 +895,8 @@ SRWBackend RWSplitSession::handle_slave_is_target(uint8_t cmd, uint32_t stmt_id)
|
|||||||
int rlag_max = get_max_replication_lag();
|
int rlag_max = get_max_replication_lag();
|
||||||
SRWBackend target;
|
SRWBackend target;
|
||||||
|
|
||||||
if (cmd == MXS_COM_STMT_FETCH)
|
if (m_qc.is_ps_continuation())
|
||||||
{
|
{
|
||||||
/** The COM_STMT_FETCH must be executed on the same server as the
|
|
||||||
* COM_STMT_EXECUTE was executed on */
|
|
||||||
ExecMap::iterator it = m_exec_map.find(stmt_id);
|
ExecMap::iterator it = m_exec_map.find(stmt_id);
|
||||||
|
|
||||||
if (it != m_exec_map.end())
|
if (it != m_exec_map.end())
|
||||||
|
@ -126,7 +126,7 @@ void RWSplitSession::process_sescmd_response(SRWBackend& backend, GWBUF** ppPack
|
|||||||
{
|
{
|
||||||
/** Map the returned response to the internal ID */
|
/** Map the returned response to the internal ID */
|
||||||
MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id);
|
MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id);
|
||||||
m_qc.ps_id_internal_put(resp.id, id);
|
m_qc.ps_store_response(id, *ppPacket);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Discard any slave connections that did not return the same result
|
// Discard any slave connections that did not return the same result
|
||||||
|
Reference in New Issue
Block a user