MXS-852: Add support for text protocol prepared statements
When a statement is being prepared, the type and name of the statement is stored in the router session. If the name of a statement to be executed is found in the map, the query type that stored in the map is used.
This commit is contained in:
@ -1,4 +1,11 @@
|
|||||||
add_library(readwritesplit SHARED readwritesplit.cc rwsplit_mysql.cc rwsplit_route_stmt.cc rwsplit_select_backends.cc rwsplit_session_cmd.cc rwsplit_tmp_table_multi.cc)
|
add_library(readwritesplit SHARED
|
||||||
|
readwritesplit.cc
|
||||||
|
rwsplit_mysql.cc
|
||||||
|
rwsplit_route_stmt.cc
|
||||||
|
rwsplit_select_backends.cc
|
||||||
|
rwsplit_session_cmd.cc
|
||||||
|
rwsplit_tmp_table_multi.cc
|
||||||
|
rwsplit_ps.cc)
|
||||||
target_link_libraries(readwritesplit maxscale-common MySQLCommon)
|
target_link_libraries(readwritesplit maxscale-common MySQLCommon)
|
||||||
set_target_properties(readwritesplit PROPERTIES VERSION "1.0.2")
|
set_target_properties(readwritesplit PROPERTIES VERSION "1.0.2")
|
||||||
install_module(readwritesplit core)
|
install_module(readwritesplit core)
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
#include <maxscale/cppdefs.hh>
|
#include <maxscale/cppdefs.hh>
|
||||||
|
|
||||||
#include <tr1/unordered_set>
|
#include <tr1/unordered_set>
|
||||||
|
#include <tr1/unordered_map>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
@ -200,6 +201,9 @@ typedef std::list<SRWBackend> SRWBackendList;
|
|||||||
typedef std::tr1::unordered_set<std::string> TableSet;
|
typedef std::tr1::unordered_set<std::string> TableSet;
|
||||||
typedef std::map<uint64_t, uint8_t> ResponseMap;
|
typedef std::map<uint64_t, uint8_t> ResponseMap;
|
||||||
|
|
||||||
|
/** Prepared statement ID to type maps for text and binary protocols */
|
||||||
|
typedef std::tr1::unordered_map<std::string, uint32_t> TextPSMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The client session structure used within this router.
|
* The client session structure used within this router.
|
||||||
*/
|
*/
|
||||||
@ -226,6 +230,7 @@ struct ROUTER_CLIENT_SES
|
|||||||
ResponseMap sescmd_responses; /**< Response to each session command */
|
ResponseMap sescmd_responses; /**< Response to each session command */
|
||||||
uint64_t sent_sescmd; /**< ID of the last sent session command*/
|
uint64_t sent_sescmd; /**< ID of the last sent session command*/
|
||||||
uint64_t recv_sescmd; /**< ID of the most recently completed session command */
|
uint64_t recv_sescmd; /**< ID of the most recently completed session command */
|
||||||
|
TextPSMap ps_text; /**< Text protocol prepared statements */
|
||||||
skygw_chk_t rses_chk_tail;
|
skygw_chk_t rses_chk_tail;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -14,6 +14,8 @@
|
|||||||
|
|
||||||
#include <maxscale/cppdefs.hh>
|
#include <maxscale/cppdefs.hh>
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
#include <maxscale/query_classifier.h>
|
#include <maxscale/query_classifier.h>
|
||||||
#include <maxscale/protocol/mysql.h>
|
#include <maxscale/protocol/mysql.h>
|
||||||
|
|
||||||
@ -110,3 +112,11 @@ bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type);
|
|||||||
uint32_t determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet);
|
uint32_t determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet);
|
||||||
|
|
||||||
void close_all_connections(ROUTER_CLIENT_SES* rses);
|
void close_all_connections(ROUTER_CLIENT_SES* rses);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Functions for prepared statement handling
|
||||||
|
*/
|
||||||
|
std::string extract_text_ps_id(GWBUF* buffer);
|
||||||
|
void store_text_ps(ROUTER_CLIENT_SES* rses, std::string id, GWBUF* buffer);
|
||||||
|
void erase_text_ps(ROUTER_CLIENT_SES* rses, std::string id);
|
||||||
|
bool get_text_ps_type(ROUTER_CLIENT_SES* rses, GWBUF* buffer, uint32_t* out);
|
||||||
|
@ -207,6 +207,15 @@ bool handle_target_is_all(route_target_t route_target, ROUTER_INSTANCE *inst,
|
|||||||
{
|
{
|
||||||
bool result = false;
|
bool result = false;
|
||||||
|
|
||||||
|
if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT))
|
||||||
|
{
|
||||||
|
store_text_ps(rses, extract_text_ps_id(querybuf), querybuf);
|
||||||
|
}
|
||||||
|
else if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT))
|
||||||
|
{
|
||||||
|
gwbuf_set_type(querybuf, GWBUF_TYPE_COLLECT_RESULT);
|
||||||
|
}
|
||||||
|
|
||||||
if (TARGET_IS_MASTER(route_target) || TARGET_IS_SLAVE(route_target))
|
if (TARGET_IS_MASTER(route_target) || TARGET_IS_SLAVE(route_target))
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
|
68
server/modules/routing/readwritesplit/rwsplit_ps.cc
Normal file
68
server/modules/routing/readwritesplit/rwsplit_ps.cc
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||||
|
*
|
||||||
|
* Change Date: 2020-01-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "readwritesplit.hh"
|
||||||
|
|
||||||
|
#include <maxscale/alloc.h>
|
||||||
|
#include <maxscale/query_classifier.h>
|
||||||
|
|
||||||
|
std::string extract_text_ps_id(GWBUF* buffer)
|
||||||
|
{
|
||||||
|
std::string rval;
|
||||||
|
char* name = qc_get_prepare_name(buffer);
|
||||||
|
|
||||||
|
if (name)
|
||||||
|
{
|
||||||
|
rval = name;
|
||||||
|
MXS_FREE(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
void store_text_ps(ROUTER_CLIENT_SES* rses, std::string id, GWBUF* buffer)
|
||||||
|
{
|
||||||
|
GWBUF* stmt = qc_get_preparable_stmt(buffer);
|
||||||
|
ss_dassert(stmt);
|
||||||
|
|
||||||
|
uint32_t type = qc_get_type_mask(stmt);
|
||||||
|
ss_dassert((type & (QUERY_TYPE_PREPARE_STMT | QUERY_TYPE_PREPARE_NAMED_STMT)) == 0);
|
||||||
|
|
||||||
|
rses->ps_text[id] = type;
|
||||||
|
}
|
||||||
|
|
||||||
|
void erase_text_ps(ROUTER_CLIENT_SES* rses, std::string id)
|
||||||
|
{
|
||||||
|
rses->ps_text.erase(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool get_text_ps_type(ROUTER_CLIENT_SES* rses, GWBUF* buffer, uint32_t* out)
|
||||||
|
{
|
||||||
|
bool rval = false;
|
||||||
|
char* name = qc_get_prepare_name(buffer);
|
||||||
|
|
||||||
|
if (name)
|
||||||
|
{
|
||||||
|
TextPSMap::iterator it = rses->ps_text.find(name);
|
||||||
|
|
||||||
|
if (it != rses->ps_text.end())
|
||||||
|
{
|
||||||
|
*out = it->second;
|
||||||
|
rval = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
MXS_FREE(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
@ -143,6 +143,15 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
|||||||
* - route primarily according to the hints and if they failed,
|
* - route primarily according to the hints and if they failed,
|
||||||
* eventually to master
|
* eventually to master
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
uint32_t ps_type;
|
||||||
|
|
||||||
|
if (qc_get_operation(querybuf) == QUERY_OP_EXECUTE &&
|
||||||
|
get_text_ps_type(rses, querybuf, &ps_type))
|
||||||
|
{
|
||||||
|
qtype = ps_type;
|
||||||
|
}
|
||||||
|
|
||||||
route_target = get_route_target(rses, qtype, querybuf->hint);
|
route_target = get_route_target(rses, qtype, querybuf->hint);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -511,6 +520,11 @@ route_target_t get_route_target(ROUTER_CLIENT_SES *rses,
|
|||||||
{
|
{
|
||||||
target = TARGET_MASTER;
|
target = TARGET_MASTER;
|
||||||
}
|
}
|
||||||
|
else if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT))
|
||||||
|
{
|
||||||
|
target = TARGET_ALL;
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* These queries are not affected by hints
|
* These queries are not affected by hints
|
||||||
*/
|
*/
|
||||||
@ -539,9 +553,7 @@ route_target_t get_route_target(ROUTER_CLIENT_SES *rses,
|
|||||||
* the execution of the prepared statements to the right server would be
|
* the execution of the prepared statements to the right server would be
|
||||||
* an easy one. Currently this is not supported.
|
* an easy one. Currently this is not supported.
|
||||||
*/
|
*/
|
||||||
if (qc_query_is_type(qtype, QUERY_TYPE_READ) &&
|
if (qc_query_is_type(qtype, QUERY_TYPE_READ))
|
||||||
!(qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT)))
|
|
||||||
{
|
{
|
||||||
MXS_WARNING("The query can't be routed to all "
|
MXS_WARNING("The query can't be routed to all "
|
||||||
"backend servers because it includes SELECT and "
|
"backend servers because it includes SELECT and "
|
||||||
@ -561,8 +573,6 @@ route_target_t get_route_target(ROUTER_CLIENT_SES *rses,
|
|||||||
else if (!trx_active && !load_active &&
|
else if (!trx_active && !load_active &&
|
||||||
!qc_query_is_type(qtype, QUERY_TYPE_MASTER_READ) &&
|
!qc_query_is_type(qtype, QUERY_TYPE_MASTER_READ) &&
|
||||||
!qc_query_is_type(qtype, QUERY_TYPE_WRITE) &&
|
!qc_query_is_type(qtype, QUERY_TYPE_WRITE) &&
|
||||||
!qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT) &&
|
|
||||||
!qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT) &&
|
|
||||||
(qc_query_is_type(qtype, QUERY_TYPE_READ) ||
|
(qc_query_is_type(qtype, QUERY_TYPE_READ) ||
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_SHOW_TABLES) ||
|
qc_query_is_type(qtype, QUERY_TYPE_SHOW_TABLES) ||
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_USERVAR_READ) ||
|
qc_query_is_type(qtype, QUERY_TYPE_USERVAR_READ) ||
|
||||||
@ -620,9 +630,7 @@ route_target_t get_route_target(ROUTER_CLIENT_SES *rses,
|
|||||||
qc_query_is_type(qtype, QUERY_TYPE_CREATE_TMP_TABLE) ||
|
qc_query_is_type(qtype, QUERY_TYPE_CREATE_TMP_TABLE) ||
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_READ_TMP_TABLE) ||
|
qc_query_is_type(qtype, QUERY_TYPE_READ_TMP_TABLE) ||
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_UNKNOWN)) ||
|
qc_query_is_type(qtype, QUERY_TYPE_UNKNOWN)) ||
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_EXEC_STMT) ||
|
qc_query_is_type(qtype, QUERY_TYPE_EXEC_STMT));
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT));
|
|
||||||
|
|
||||||
target = TARGET_MASTER;
|
target = TARGET_MASTER;
|
||||||
}
|
}
|
||||||
@ -1031,12 +1039,6 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
|||||||
/** The session command cursor must not be active */
|
/** The session command cursor must not be active */
|
||||||
ss_dassert(target->session_command_count() == 0);
|
ss_dassert(target->session_command_count() == 0);
|
||||||
|
|
||||||
/** We only want the complete response to the preparation */
|
|
||||||
if (MYSQL_GET_COMMAND(GWBUF_DATA(querybuf)) == MYSQL_COM_STMT_PREPARE)
|
|
||||||
{
|
|
||||||
gwbuf_set_type(querybuf, GWBUF_TYPE_COLLECT_RESULT);
|
|
||||||
}
|
|
||||||
|
|
||||||
mxs::Backend::response_type response = mxs::Backend::NO_RESPONSE;
|
mxs::Backend::response_type response = mxs::Backend::NO_RESPONSE;
|
||||||
mysql_server_cmd_t cmd = mxs_mysql_current_command(rses->client_dcb->session);
|
mysql_server_cmd_t cmd = mxs_mysql_current_command(rses->client_dcb->session);
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user