Merge branch '2.3' into 2.4
This commit is contained in:
commit
fcc19f3c66
@ -54,7 +54,7 @@ if(HAVE_SYSTEMD)
|
||||
elseif(NOT BUILD_SYSTEM_TESTS)
|
||||
# If systemd is in use, require libsystemd-dev to be installed
|
||||
if(NOT NOT_SYSTEMD_IS_RUNNING)
|
||||
message( FATAL_ERROR "systemd is running: please install libsystemd-dev" )
|
||||
message(FATAL_ERROR "systemd is running: please install libsystemd-dev (DEB) or systemd-devel (RPM)")
|
||||
endif()
|
||||
endif()
|
||||
|
||||
|
@ -40,9 +40,6 @@ set(CPACK_RPM_USER_FILELIST "${IGNORED_DIRS}")
|
||||
if(TARGET_COMPONENT STREQUAL "core" OR TARGET_COMPONENT STREQUAL "all")
|
||||
set(CPACK_RPM_POST_INSTALL_SCRIPT_FILE ${CMAKE_BINARY_DIR}/postinst)
|
||||
set(CPACK_RPM_POST_UNINSTALL_SCRIPT_FILE ${CMAKE_BINARY_DIR}/postrm)
|
||||
|
||||
# Installing this prevents RPM from deleting the /var/lib/maxscale folder
|
||||
install(DIRECTORY DESTINATION ${MAXSCALE_VARDIR}/lib/maxscale)
|
||||
endif()
|
||||
|
||||
if(EXTRA_PACKAGE_DEPENDENCIES)
|
||||
|
@ -234,6 +234,17 @@ public:
|
||||
return qc_query_is_type(m_route_info.type_mask(), QUERY_TYPE_BEGIN_TRX);
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the current binary protocol statement is a continuation of a previously executed statement.
|
||||
*
|
||||
* All COM_STMT_FETCH are continuations of a previously executed COM_STMT_EXECUTE. A COM_STMT_EXECUTE can
|
||||
* be a continuation if it has parameters but it doesn't provide the metadata for them.
|
||||
*/
|
||||
bool is_ps_continuation() const
|
||||
{
|
||||
return m_ps_continuation;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Store and process a prepared statement
|
||||
*
|
||||
@ -251,12 +262,16 @@ public:
|
||||
void ps_erase(GWBUF* buffer);
|
||||
|
||||
/**
|
||||
* @brief Store a mapping from an external id to the corresponding internal id
|
||||
* @brief Store a prepared statement response
|
||||
*
|
||||
* @param external_id The external id as seen by the client.
|
||||
* @param internal_id The corresponding internal id.
|
||||
* The response maps the internal ID to the external ID that is given to the client. It also collects
|
||||
* the number of parameters in the prepared statement which are required in some cases in the routing
|
||||
* process.
|
||||
*
|
||||
* @param internal_id The internal id (i.e. the session command number)
|
||||
* @param buffer The buffer containing the OK response to a COM_STMT_PREPARE
|
||||
*/
|
||||
void ps_id_internal_put(uint32_t external_id, uint32_t internal_id);
|
||||
void ps_store_response(uint32_t internal_id, GWBUF* buffer);
|
||||
|
||||
/**
|
||||
* @brief Update the current RouteInfo.
|
||||
@ -373,6 +388,8 @@ private:
|
||||
uint8_t packet_type,
|
||||
uint32_t* qtype);
|
||||
|
||||
bool query_continues_ps(uint8_t cmd, uint32_t stmt_id, GWBUF* buffer);
|
||||
|
||||
private:
|
||||
class PSManager;
|
||||
typedef std::shared_ptr<PSManager> SPSManager;
|
||||
@ -397,5 +414,6 @@ private:
|
||||
HandleMap m_ps_handles; /** External ID to internal ID */
|
||||
RouteInfo m_route_info;
|
||||
bool m_trx_is_read_only;
|
||||
bool m_ps_continuation;
|
||||
};
|
||||
}
|
||||
|
@ -179,11 +179,11 @@ void Mariadb_nodes::read_env()
|
||||
|
||||
// reading start_db_command
|
||||
sprintf(env_name, "%s_%03d_start_db_command", prefix, i);
|
||||
start_db_command[i] = readenv(env_name, (char *) "service mysql start");
|
||||
start_db_command[i] = readenv(env_name, (char *) "systemctl start mariadb || service mysql start");
|
||||
|
||||
// reading stop_db_command
|
||||
sprintf(env_name, "%s_%03d_stop_db_command", prefix, i);
|
||||
stop_db_command[i] = readenv(env_name, (char *) "service mysql stop");
|
||||
stop_db_command[i] = readenv(env_name, (char *) "systemctl stop mariadb || service mysql stop");
|
||||
|
||||
// reading cleanup_db_command
|
||||
sprintf(env_name, "%s_%03d_cleanup_db_command", prefix, i);
|
||||
|
@ -5,63 +5,87 @@
|
||||
|
||||
#include "testconnections.h"
|
||||
|
||||
int main(int argc, char** argv)
|
||||
void do_test(TestConnections& test, MYSQL* conn)
|
||||
{
|
||||
TestConnections test(argc, argv);
|
||||
test.maxscales->connect();
|
||||
|
||||
auto conn = test.maxscales->conn_rwsplit[0];
|
||||
|
||||
test.try_query(conn, "DROP TABLE IF EXISTS double_execute;");
|
||||
test.try_query(conn, "CREATE TABLE double_execute(a int);");
|
||||
test.try_query(conn, "INSERT INTO double_execute VALUES (123), (456)");
|
||||
|
||||
auto stmt = mysql_stmt_init(conn);
|
||||
std::string sql = "select a, @@server_id from double_execute where a = ?";
|
||||
test.expect(mysql_stmt_prepare(stmt, sql.c_str(), sql.length()) == 0,
|
||||
"Prepare should work: %s", mysql_error(conn));
|
||||
|
||||
int data[2] = {0, 0};
|
||||
MYSQL_BIND my_bind[2] = {};
|
||||
char is_null = 0;
|
||||
my_bind[0].buffer_type = MYSQL_TYPE_LONG;
|
||||
my_bind[0].buffer = &data[0];
|
||||
my_bind[0].buffer_length = sizeof(data[0]);
|
||||
my_bind[0].is_null = &is_null;
|
||||
my_bind[1].buffer_type = MYSQL_TYPE_LONG;
|
||||
my_bind[1].buffer = &data[1];
|
||||
my_bind[1].buffer_length = sizeof(data[2]);
|
||||
my_bind[1].is_null = &is_null;
|
||||
data[1] = 123;
|
||||
test.expect(mysql_stmt_bind_param(stmt, my_bind) == 0, "Bind: %s", mysql_stmt_error(stmt));
|
||||
int data_out = 123;
|
||||
MYSQL_BIND bind_out;
|
||||
char null_out = 0;
|
||||
bind_out.buffer_type = MYSQL_TYPE_LONG;
|
||||
bind_out.buffer = &data_out;
|
||||
bind_out.buffer_length = sizeof(data_out);
|
||||
bind_out.is_null = &null_out;
|
||||
test.expect(mysql_stmt_bind_param(stmt, &bind_out) == 0, "Bind: %s", mysql_stmt_error(stmt));
|
||||
|
||||
// The first execute is done on the master
|
||||
test.try_query(conn, "BEGIN");
|
||||
|
||||
test.expect(mysql_stmt_execute(stmt) == 0, "First execute should work: %s", mysql_stmt_error(stmt));
|
||||
data[0] = 0;
|
||||
|
||||
int data_in[2] = {};
|
||||
MYSQL_BIND bind_in[2] = {};
|
||||
char null_in[2] = {};
|
||||
|
||||
for (int i = 0; i < 2; i++)
|
||||
{
|
||||
bind_in[i].buffer_type = MYSQL_TYPE_LONG;
|
||||
bind_in[i].buffer = &data_in[i];
|
||||
bind_in[i].buffer_length = sizeof(data_in[i]);
|
||||
bind_in[i].is_null = &null_in[i];
|
||||
}
|
||||
|
||||
mysql_stmt_bind_result(stmt, bind_in);
|
||||
mysql_stmt_store_result(stmt);
|
||||
|
||||
test.expect(mysql_stmt_fetch(stmt) == 0, "First fetch of first execute should work");
|
||||
test.expect(data[0] == 123, "Query should return one row with value 123: `%d`", data[0]);
|
||||
test.expect(data_in[0] == 123, "Query should return one row with value 123: `%d`", data_in[0]);
|
||||
test.expect(mysql_stmt_fetch(stmt) != 0, "Second fetch of first execute should NOT work");
|
||||
|
||||
int first_server = data_in[1];
|
||||
|
||||
test.try_query(conn, "COMMIT");
|
||||
|
||||
// The second execute goes to a slave, no new parameters are sent in it
|
||||
data[0] = 123;
|
||||
memset(data_in, 0, sizeof(data_in));
|
||||
test.expect(mysql_stmt_execute(stmt) == 0, "Second execute should work: %s", mysql_stmt_error(stmt));
|
||||
data[0] = 0;
|
||||
|
||||
mysql_stmt_bind_result(stmt, bind_in);
|
||||
mysql_stmt_store_result(stmt);
|
||||
|
||||
test.expect(mysql_stmt_fetch(stmt) == 0, "First fetch of second execute should work");
|
||||
test.expect(data[0] == 123, "Query should return one row with value 123: `%d`", data[0]);
|
||||
test.expect(data_in[0] == 123, "Query should return one row with value 123: `%d`", data_in[0]);
|
||||
test.expect(data_in[1] == first_server,
|
||||
"The query should be routed to the server with server_id %d, not %d",
|
||||
first_server, data_in[1]);
|
||||
test.expect(mysql_stmt_fetch(stmt) != 0, "Second fetch of second execute should NOT work");
|
||||
|
||||
mysql_stmt_close(stmt);
|
||||
}
|
||||
|
||||
test.try_query(conn, "DROP TABLE IF EXISTS double_execute;");
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
TestConnections test(argc, argv);
|
||||
|
||||
test.repl->connect();
|
||||
test.maxscales->connect();
|
||||
|
||||
// Prepare a table
|
||||
test.try_query(test.repl->nodes[0], "DROP TABLE IF EXISTS double_execute;");
|
||||
test.try_query(test.repl->nodes[0], "CREATE TABLE double_execute(a int);");
|
||||
test.try_query(test.repl->nodes[0], "INSERT INTO double_execute VALUES (123), (456)");
|
||||
test.repl->sync_slaves();
|
||||
|
||||
test.tprintf("Running test with a direct connection");
|
||||
do_test(test, test.repl->nodes[0]);
|
||||
|
||||
test.tprintf("Running test through readwritesplit");
|
||||
do_test(test, test.maxscales->conn_rwsplit[0]);
|
||||
|
||||
test.try_query(test.repl->nodes[0], "DROP TABLE IF EXISTS double_execute;");
|
||||
|
||||
return test.global_result;
|
||||
}
|
||||
|
@ -75,6 +75,19 @@ uint32_t qc_mysql_extract_ps_id(GWBUF* buffer)
|
||||
return rval;
|
||||
}
|
||||
|
||||
uint16_t qc_extract_ps_param_count(GWBUF* buffer)
|
||||
{
|
||||
uint16_t rval = 0;
|
||||
uint8_t params[MYSQL_PS_PARAMS_SIZE];
|
||||
|
||||
if (gwbuf_copy_data(buffer, MYSQL_PS_PARAMS_OFFSET, sizeof(params), params) == sizeof(params))
|
||||
{
|
||||
rval = gw_mysql_get_byte2(params);
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
bool have_semicolon(const char* ptr, int len)
|
||||
{
|
||||
for (int i = 0; i < len; i++)
|
||||
@ -265,7 +278,7 @@ public:
|
||||
break;
|
||||
|
||||
case MXS_COM_STMT_PREPARE:
|
||||
m_binary_ps[id] = get_prepare_type(buffer);
|
||||
m_binary_ps[id].type = get_prepare_type(buffer);
|
||||
break;
|
||||
|
||||
default:
|
||||
@ -281,7 +294,7 @@ public:
|
||||
|
||||
if (it != m_binary_ps.end())
|
||||
{
|
||||
rval = it->second;
|
||||
rval = it->second.type;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -342,8 +355,32 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void set_param_count(uint32_t id, uint16_t param_count)
|
||||
{
|
||||
m_binary_ps[id].param_count = param_count;
|
||||
}
|
||||
|
||||
uint16_t param_count(uint32_t id) const
|
||||
{
|
||||
uint16_t rval = 0;
|
||||
auto it = m_binary_ps.find(id);
|
||||
|
||||
if (it != m_binary_ps.end())
|
||||
{
|
||||
rval = it->second.param_count;
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
private:
|
||||
typedef std::unordered_map<uint32_t, uint32_t> BinaryPSMap;
|
||||
struct BinaryPS
|
||||
{
|
||||
uint32_t type = 0;
|
||||
uint16_t param_count = 0;
|
||||
};
|
||||
|
||||
typedef std::unordered_map<uint32_t, BinaryPS> BinaryPSMap;
|
||||
typedef std::unordered_map<std::string, uint32_t> TextPSMap;
|
||||
|
||||
private:
|
||||
@ -368,6 +405,7 @@ QueryClassifier::QueryClassifier(Handler* pHandler,
|
||||
, m_multi_statements_allowed(are_multi_statements_allowed(pSession))
|
||||
, m_sPs_manager(new PSManager)
|
||||
, m_trx_is_read_only(true)
|
||||
, m_ps_continuation(false)
|
||||
{
|
||||
}
|
||||
|
||||
@ -622,9 +660,15 @@ uint32_t QueryClassifier::ps_id_internal_get(GWBUF* pBuffer)
|
||||
return internal_id;
|
||||
}
|
||||
|
||||
void QueryClassifier::ps_id_internal_put(uint32_t external_id, uint32_t internal_id)
|
||||
void QueryClassifier::ps_store_response(uint32_t internal_id, GWBUF* buffer)
|
||||
{
|
||||
auto external_id = qc_mysql_extract_ps_id(buffer);
|
||||
m_ps_handles[external_id] = internal_id;
|
||||
|
||||
if (auto param_count = qc_extract_ps_param_count(buffer))
|
||||
{
|
||||
m_sPs_manager->set_param_count(internal_id, param_count);
|
||||
}
|
||||
}
|
||||
|
||||
void QueryClassifier::log_transaction_status(GWBUF* querybuf, uint32_t qtype)
|
||||
@ -909,6 +953,38 @@ QueryClassifier::current_target_t QueryClassifier::handle_multi_temp_and_load(
|
||||
return rv;
|
||||
}
|
||||
|
||||
bool QueryClassifier::query_continues_ps(uint8_t cmd, uint32_t stmt_id, GWBUF* buffer)
|
||||
{
|
||||
bool rval = false;
|
||||
|
||||
if (cmd == COM_STMT_FETCH)
|
||||
{
|
||||
// COM_STMT_FETCH should always go to the same target as the COM_STMT_EXECUTE
|
||||
rval = true;
|
||||
}
|
||||
else if (cmd == MXS_COM_STMT_EXECUTE)
|
||||
{
|
||||
if (auto params = m_sPs_manager->param_count(stmt_id))
|
||||
{
|
||||
size_t types_offset = MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + ((params + 7) / 8);
|
||||
uint8_t have_types = 0;
|
||||
|
||||
if (gwbuf_copy_data(buffer, types_offset, 1, &have_types))
|
||||
{
|
||||
if (have_types == 0)
|
||||
{
|
||||
// A previous COM_STMT_EXECUTE provided the field types, and this one relies on the
|
||||
// previous one. This means that this query must be routed to the same server where the
|
||||
// previous COM_STMT_EXECUTE was routed.
|
||||
rval = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
QueryClassifier::RouteInfo QueryClassifier::update_route_info(
|
||||
QueryClassifier::current_target_t current_target,
|
||||
GWBUF* pBuffer)
|
||||
@ -1005,6 +1081,7 @@ QueryClassifier::RouteInfo QueryClassifier::update_route_info(
|
||||
{
|
||||
stmt_id = ps_id_internal_get(pBuffer);
|
||||
type_mask = ps_get_type(stmt_id);
|
||||
m_ps_continuation = query_continues_ps(command, stmt_id, pBuffer);
|
||||
}
|
||||
|
||||
route_target = get_route_target(command, type_mask);
|
||||
|
@ -1277,8 +1277,8 @@ bool mxs_mysql_extract_ps_response(GWBUF* buffer, MXS_PS_RESPONSE* out)
|
||||
{
|
||||
bool rval = false;
|
||||
uint8_t id[MYSQL_PS_ID_SIZE];
|
||||
uint8_t cols[MYSQL_PS_ID_SIZE];
|
||||
uint8_t params[MYSQL_PS_ID_SIZE];
|
||||
uint8_t cols[MYSQL_PS_COLS_SIZE];
|
||||
uint8_t params[MYSQL_PS_PARAMS_SIZE];
|
||||
uint8_t warnings[MYSQL_PS_WARN_SIZE];
|
||||
|
||||
if (gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET, sizeof(id), id) == sizeof(id)
|
||||
|
@ -822,10 +822,8 @@ RWBackend* RWSplitSession::handle_slave_is_target(uint8_t cmd, uint32_t stmt_id)
|
||||
int rlag_max = get_max_replication_lag();
|
||||
RWBackend* target = nullptr;
|
||||
|
||||
if (cmd == MXS_COM_STMT_FETCH)
|
||||
if (m_qc.is_ps_continuation())
|
||||
{
|
||||
/** The COM_STMT_FETCH must be executed on the same server as the
|
||||
* COM_STMT_EXECUTE was executed on */
|
||||
ExecMap::iterator it = m_exec_map.find(stmt_id);
|
||||
|
||||
if (it != m_exec_map.end())
|
||||
|
@ -128,7 +128,7 @@ void RWSplitSession::process_sescmd_response(RWBackend* backend, GWBUF** ppPacke
|
||||
{
|
||||
/** Map the returned response to the internal ID */
|
||||
MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id);
|
||||
m_qc.ps_id_internal_put(resp.id, id);
|
||||
m_qc.ps_store_response(id, *ppPacket);
|
||||
}
|
||||
|
||||
// Discard any slave connections that did not return the same result
|
||||
|
Loading…
x
Reference in New Issue
Block a user