MXS-1804: Allow large session commands
Session commands that span multiple packets are now allowed and will work. However, if one is executed the session command history is disabled as no interface for appending to session commands exists. The backend protocol modules now also correctly track the current command. This was a pre-requisite for large session commands as they needed to be gathered into a single buffer and to do this the current command had to be accurate. Updated tests to expect success instead of failure for large prepared statements.
This commit is contained in:
parent
dc4086b182
commit
d6c44aaf52
@ -355,6 +355,7 @@ typedef struct
|
||||
bool collect_result; /*< Collect the next result set as one buffer */
|
||||
bool changing_user;
|
||||
uint32_t num_eof_packets; /*< Encountered eof packet number, used for check packet type */
|
||||
bool large_query; /*< Whether to ignore the command byte of the next packet*/
|
||||
#if defined(SS_DEBUG)
|
||||
skygw_chk_t protocol_chk_tail;
|
||||
#endif
|
||||
|
@ -33,7 +33,7 @@ int main(int argc, char** argv)
|
||||
test.maxscales->connect();
|
||||
|
||||
MYSQL_STMT* stmt = mysql_stmt_init(test.maxscales->conn_rwsplit[0]);
|
||||
test.assert(mysql_stmt_prepare(stmt, sqlstr, strlen(sqlstr)) != 0, "Prepare should fail in 2.2 but not hang",
|
||||
test.assert(mysql_stmt_prepare(stmt, sqlstr, strlen(sqlstr)) == 0, "Prepare should not fail",
|
||||
mysql_stmt_error(stmt));
|
||||
mysql_stmt_close(stmt);
|
||||
|
||||
|
@ -409,7 +409,19 @@ static inline void prepare_for_write(DCB *dcb, GWBUF *buffer)
|
||||
*/
|
||||
if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT))
|
||||
{
|
||||
proto->current_command = (mxs_mysql_cmd_t)MYSQL_GET_COMMAND(GWBUF_DATA(buffer));
|
||||
uint8_t* data = GWBUF_DATA(buffer);
|
||||
|
||||
if (!proto->large_query)
|
||||
{
|
||||
proto->current_command = (mxs_mysql_cmd_t)MYSQL_GET_COMMAND(data);
|
||||
}
|
||||
|
||||
/**
|
||||
* If the buffer contains a large query, we have to skip the command
|
||||
* byte extraction for the next packet. This way current_command always
|
||||
* contains the latest command executed on this backend.
|
||||
*/
|
||||
proto->large_query = MYSQL_GET_PAYLOAD_LEN(data) == MYSQL_PACKET_LENGTH_MAX;
|
||||
}
|
||||
else if (dcb->session->client_dcb && dcb->session->client_dcb->protocol)
|
||||
{
|
||||
|
@ -73,6 +73,7 @@ MySQLProtocol* mysql_protocol_init(DCB* dcb, int fd)
|
||||
p->collect_result = false;
|
||||
p->changing_user = false;
|
||||
p->num_eof_packets = 0;
|
||||
p->large_query = false;
|
||||
#if defined(SS_DEBUG)
|
||||
p->protocol_chk_top = CHK_NUM_PROTOCOL;
|
||||
p->protocol_chk_tail = CHK_NUM_PROTOCOL;
|
||||
|
@ -35,6 +35,11 @@ bool RWBackend::execute_session_command()
|
||||
return rval;
|
||||
}
|
||||
|
||||
bool RWBackend::continue_session_command(GWBUF* buffer)
|
||||
{
|
||||
return Backend::write(buffer, NO_RESPONSE);
|
||||
}
|
||||
|
||||
void RWBackend::add_ps_handle(uint32_t id, uint32_t handle)
|
||||
{
|
||||
m_ps_handles[id] = handle;
|
||||
|
@ -64,6 +64,7 @@ public:
|
||||
uint32_t get_ps_handle(uint32_t id) const;
|
||||
|
||||
bool execute_session_command();
|
||||
bool continue_session_command(GWBUF* buffer);
|
||||
bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE);
|
||||
void close(close_type type = CLOSE_NORMAL);
|
||||
|
||||
|
@ -83,6 +83,7 @@ bool RWSplitSession::handle_target_is_all(route_target_t route_target, GWBUF *qu
|
||||
int packet_type, uint32_t qtype)
|
||||
{
|
||||
bool result = false;
|
||||
bool is_large = is_large_query(querybuf);
|
||||
|
||||
if (TARGET_IS_MASTER(route_target) || TARGET_IS_SLAVE(route_target))
|
||||
{
|
||||
@ -111,13 +112,23 @@ bool RWSplitSession::handle_target_is_all(route_target_t route_target, GWBUF *qu
|
||||
MXS_FREE(query_str);
|
||||
MXS_FREE(qtype_str);
|
||||
}
|
||||
else if (m_qc.large_query())
|
||||
{
|
||||
// TODO: Append to the already stored session command instead of disabling history
|
||||
MXS_INFO("Large session write, have to disable session command history");
|
||||
m_config.disable_sescmd_history = true;
|
||||
|
||||
continue_large_session_write(querybuf, qtype);
|
||||
result = true;
|
||||
}
|
||||
else if (route_session_write(gwbuf_clone(querybuf), packet_type, qtype))
|
||||
{
|
||||
|
||||
result = true;
|
||||
atomic_add_uint64(&m_router->stats().n_all, 1);
|
||||
}
|
||||
|
||||
m_qc.set_large_query(is_large);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -159,7 +159,6 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf)
|
||||
|
||||
if (TARGET_IS_ALL(route_target))
|
||||
{
|
||||
// TODO: Handle payloads larger than (2^24 - 1) bytes that are routed to all servers
|
||||
succp = handle_target_is_all(route_target, querybuf, command, qtype);
|
||||
}
|
||||
else
|
||||
@ -263,18 +262,6 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf)
|
||||
return succp;
|
||||
}
|
||||
|
||||
static inline bool is_large_query(GWBUF* buf)
|
||||
{
|
||||
uint32_t buflen = gwbuf_length(buf);
|
||||
|
||||
// The buffer should contain at most (2^24 - 1) + 4 bytes ...
|
||||
ss_dassert(buflen <= MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN);
|
||||
// ... and the payload should be buflen - 4 bytes
|
||||
ss_dassert(MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buf)) == buflen - MYSQL_HEADER_LEN);
|
||||
|
||||
return buflen == MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN;
|
||||
}
|
||||
|
||||
/**
|
||||
* Purge session command history
|
||||
*
|
||||
@ -322,6 +309,19 @@ void RWSplitSession::purge_history(mxs::SSessionCommand& sescmd)
|
||||
}
|
||||
}
|
||||
|
||||
void RWSplitSession::continue_large_session_write(GWBUF *querybuf, uint32_t type)
|
||||
{
|
||||
for (auto it = m_backends.begin(); it != m_backends.end(); it++)
|
||||
{
|
||||
SRWBackend& backend = *it;
|
||||
|
||||
if (backend->in_use())
|
||||
{
|
||||
backend->continue_session_command(gwbuf_clone(querybuf));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute in backends used by current router session.
|
||||
* Save session variable commands to router session property
|
||||
@ -343,13 +343,6 @@ void RWSplitSession::purge_history(mxs::SSessionCommand& sescmd)
|
||||
*/
|
||||
bool RWSplitSession::route_session_write(GWBUF *querybuf, uint8_t command, uint32_t type)
|
||||
{
|
||||
if (is_large_query(querybuf))
|
||||
{
|
||||
MXS_ERROR("Session command is too large, session cannot continue. "
|
||||
"Large session commands are not supported in 2.2.");
|
||||
return false;
|
||||
}
|
||||
|
||||
/** The SessionCommand takes ownership of the buffer */
|
||||
uint64_t id = m_sescmd_count++;
|
||||
mxs::SSessionCommand sescmd(new mxs::SessionCommand(querybuf, id));
|
||||
|
@ -141,7 +141,10 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
|
||||
current_target = QueryClassifier::CURRENT_TARGET_SLAVE;
|
||||
}
|
||||
|
||||
m_qc.update_route_info(current_target, querybuf);
|
||||
if (!m_qc.large_query())
|
||||
{
|
||||
m_qc.update_route_info(current_target, querybuf);
|
||||
}
|
||||
|
||||
/** No active or pending queries */
|
||||
if (route_single_stmt(querybuf))
|
||||
|
@ -151,6 +151,7 @@ private:
|
||||
void purge_history(mxs::SSessionCommand& sescmd);
|
||||
|
||||
bool route_session_write(GWBUF *querybuf, uint8_t command, uint32_t type);
|
||||
void continue_large_session_write(GWBUF *querybuf, uint32_t type);
|
||||
bool route_single_stmt(GWBUF *querybuf);
|
||||
bool route_stored_query();
|
||||
|
||||
@ -217,6 +218,18 @@ private:
|
||||
{
|
||||
return !m_config.disable_sescmd_history || m_recv_sescmd == 0;
|
||||
}
|
||||
|
||||
inline bool is_large_query(GWBUF* buf)
|
||||
{
|
||||
uint32_t buflen = gwbuf_length(buf);
|
||||
|
||||
// The buffer should contain at most (2^24 - 1) + 4 bytes ...
|
||||
ss_dassert(buflen <= MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN);
|
||||
// ... and the payload should be buflen - 4 bytes
|
||||
ss_dassert(MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buf)) == buflen - MYSQL_HEADER_LEN);
|
||||
|
||||
return buflen == MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user