MXS-2521:Route subseqenct COM_STMT_EXECUTE to the same server which first COM_STMT_EXECUTE was executed on

This commit is contained in:
wuzang.hdp 2019-06-20 16:10:19 +08:00 committed by Johan Wikman
parent d15582d26d
commit 8d50450b5a
5 changed files with 60 additions and 17 deletions

View File

@ -78,7 +78,7 @@ void handle_multi_temp_and_load(RWSplitSession *rses, GWBUF *querybuf,
SRWBackend handle_hinted_target(RWSplitSession *rses, GWBUF *querybuf,
route_target_t route_target);
SRWBackend handle_slave_is_target(RWSplit *inst, RWSplitSession *rses,
uint8_t cmd, uint32_t id);
const GWBUF *query, const RouteInfo& info);
bool handle_master_is_target(RWSplit *inst, RWSplitSession *rses,
SRWBackend* dest);
bool handle_got_target(RWSplit *inst, RWSplitSession *rses,

View File

@ -93,7 +93,7 @@ void handle_connection_keepalive(RWSplit *inst, RWSplitSession *rses,
}
route_target_t get_target_type(RWSplitSession *rses, GWBUF *buffer,
uint8_t* command, uint32_t* type, uint32_t* stmt_id)
uint8_t* command, uint32_t* type, uint32_t* stmt_id, uint16_t* n_params)
{
route_target_t route_target = TARGET_MASTER;
bool in_read_only_trx = rses->target_node && session_trx_is_read_only(rses->client_dcb->session);
@ -161,7 +161,7 @@ route_target_t get_target_type(RWSplitSession *rses, GWBUF *buffer,
}
else if (is_ps_command(*command))
{
*stmt_id = get_internal_ps_id(rses, buffer);
*stmt_id = get_internal_ps_id(rses, buffer, n_params);
*type = rses->ps_manager.get_type(*stmt_id);
}
@ -250,7 +250,7 @@ bool route_single_stmt(RWSplit *inst, RWSplitSession *rses, GWBUF *querybuf, con
}
else if (TARGET_IS_SLAVE(route_target))
{
if ((target = handle_slave_is_target(inst, rses, command, stmt_id)))
if ((target = handle_slave_is_target(inst, rses, querybuf, info)))
{
succp = true;
@ -999,6 +999,38 @@ SRWBackend handle_hinted_target(RWSplitSession *rses, GWBUF *querybuf,
return target;
}
/**
* @brief Determine whether this stmt is subsequent COM_STMT_EXECUTE
*
* @param cmd command type
* @param query GWBUF including the query
* @param info Route info
*
* return is subsequent COM_STMT_EXECUTE
*/
bool is_sub_stmt_exec(uint8_t cmd, const GWBUF *query, uint16_t n_params)
{
if (cmd != COM_STMT_EXECUTE || n_params == 0)
{
return false;
}
bool rval = true;
/*https://mariadb.com/kb/en/library/com_stmt_execute/*/
/*need n_params to parse new_params_bound_flag(alias send type to server)*/
int new_params_bound_flag_offset = MYSQL_HEADER_LEN + 10 + (n_params + 7) / 8;
ss_dassert(gwbuf_length(query) <= new_params_bound_flag_offset);
uint8_t data[new_params_bound_flag_offset];
gwbuf_copy_data(query, 0, new_params_bound_flag_offset, data);
if (data[new_params_bound_flag_offset])
{
rval = false;
}
return rval;
}
/**
* @brief Handle slave is the target
*
@ -1006,20 +1038,27 @@ SRWBackend handle_hinted_target(RWSplitSession *rses, GWBUF *querybuf,
*
* @param inst Router instance
* @param ses Router session
* @param target_dcb DCB for the target server
* @param query GWBUF including the query
* @param info Holding routing related information
*
* @return bool - true if succeeded, false otherwise
*/
SRWBackend handle_slave_is_target(RWSplit *inst, RWSplitSession *rses,
uint8_t cmd, uint32_t stmt_id)
const GWBUF *query, const RouteInfo& info)
{
int rlag_max = rses_get_max_replication_lag(rses);
SRWBackend target;
uint8_t cmd = info.command;
uint32_t stmt_id = info.stmt_id;
uint16_t n_params = info.n_params;
if (cmd == MXS_COM_STMT_FETCH)
if (cmd == MXS_COM_STMT_FETCH || is_sub_stmt_exec(cmd, query, n_params))
{
/** The COM_STMT_FETCH must be executed on the same server as the
* COM_STMT_EXECUTE was executed on */
* COM_STMT_EXECUTE was executed on, the subsequent COM_STMT_EXECUTE also
*/
const char* command_str = cmd == MXS_COM_STMT_FETCH ? "COM_STMT_FETCH" : "subseqent COM_STMT_EXECUTE";
ExecMap::iterator it = rses->exec_map.find(stmt_id);
if (it != rses->exec_map.end())
@ -1027,17 +1066,17 @@ SRWBackend handle_slave_is_target(RWSplit *inst, RWSplitSession *rses,
if (it->second->in_use())
{
target = it->second;
MXS_INFO("COM_STMT_FETCH on %s", target->name());
MXS_INFO("%s on %s", command_str, target->name());
}
else
{
MXS_ERROR("Old COM_STMT_EXECUTE target %s not in use, cannot "
"proceed with COM_STMT_FETCH", it->second->name());
"proceed with %s", it->second->name(), command_str);
}
}
else
{
MXS_WARNING("Unknown statement ID %u used in COM_STMT_FETCH", stmt_id);
MXS_WARNING("Unknown statement ID %u used in %s", stmt_id, command_str);
}
}
else

View File

@ -122,7 +122,7 @@ void process_sescmd_response(RWSplitSession* rses, SRWBackend& backend,
{
/** Map the returned response to the internal ID */
MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id);
rses->ps_handles[resp.id] = id;
rses->ps_handles[resp.id] = (id << 16) + resp.parameters;
}
// Discard any slave connections that did not return the same result

View File

@ -110,7 +110,7 @@ bool RWBackend::consume_fetched_rows(GWBUF* buffer)
return m_expected_rows == 0;
}
uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer)
uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer, uint16_t* n_params)
{
uint32_t rval = 0;
@ -120,7 +120,8 @@ uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer)
if (it != rses->ps_handles.end())
{
rval = it->second;
rval = it->second >> 16;
*n_params = it->second & 0xffff;
}
else
{
@ -135,7 +136,8 @@ RouteInfo::RouteInfo(RWSplitSession* rses, GWBUF* buffer):
target(TARGET_UNDEFINED),
command(0xff),
type(QUERY_TYPE_UNKNOWN),
stmt_id(0)
stmt_id(0),
n_params(0)
{
target = get_target_type(rses, buffer, &command, &type, &stmt_id);
}

View File

@ -32,7 +32,7 @@ enum reply_state_t
rstostr((a)->get_reply_state()), rstostr(b));
typedef std::map<uint32_t, uint32_t> BackendHandleMap; /** Internal ID to external ID */
typedef std::map<uint32_t, uint32_t> ClientHandleMap; /** External ID to internal ID */
typedef std::map<uint32_t, uint64_t> ClientHandleMap; /** External ID to internal ID */
class RWBackend: public mxs::Backend
{
@ -171,6 +171,7 @@ struct RouteInfo
uint8_t command; /**< The command byte, 0xff for unknown commands */
uint32_t type; /**< The query type, QUERY_TYPE_UNKNOWN for unknown types*/
uint32_t stmt_id; /**< Prepared statement ID, 0 for unknown */
uint16_t n_params; /**< Prepared statement params count */
};
/**
@ -202,7 +203,8 @@ static inline const char* rstostr(reply_state_t state)
*
* @param rses Router client session
* @param buffer Buffer containing a binary protocol statement other than COM_STMT_PREPARE
* @param n_params statement parmas number
*
* @return The internal ID of the prepared statement that the buffer contents refer to
*/
uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer);
uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer, uint16_t* n_params);