MXS-1828: Simplify LOAD DATA LOCAL INFILE handling
By relying on the server to tell us that it is requesting the loading of a local infile, we can remove one state from the state machine that governs the loading of local files. It also removes the need to handle error and success cases separately. A side-effect of this change is that execution of multi-statement LOAD DATA LOCAL INFILE no longer hangs. This is done by checking whether the completion of one command initiates a new load. The current code recursively checks the reply state and clones the buffers. Neither of these are required nor should they be done but refactoring the code is to be done in a separate commit. Added two helper functions that are used to detect requests for local infiles and to extract the total packet length from a non-contiguous GWBUF.
This commit is contained in:
parent
35f2382263
commit
91cc5b1e89
@ -563,6 +563,15 @@ bool mxs_mysql_is_err_packet(GWBUF *buffer);
|
||||
*/
|
||||
bool mxs_mysql_is_result_set(GWBUF *buffer);
|
||||
|
||||
/**
|
||||
* @brief Check if the buffer contains a LOCAL INFILE request
|
||||
*
|
||||
* @param buffer Buffer containing a complete MySQL packet
|
||||
*
|
||||
* @return True if the buffer contains a LOCAL INFILE request
|
||||
*/
|
||||
bool mxs_mysql_is_local_infile(GWBUF *buffer);
|
||||
|
||||
/**
|
||||
* @brief Check if the buffer contains a prepared statement OK packet
|
||||
*
|
||||
@ -643,6 +652,23 @@ static inline uint8_t mxs_mysql_get_command(GWBUF* buffer)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Get the total size of the first packet
|
||||
*
|
||||
* The size includes the payload and the header
|
||||
*
|
||||
* @param buffer Buffer to inspect
|
||||
*
|
||||
* @return The total packet size in bytes
|
||||
*/
|
||||
static inline uint32_t mxs_mysql_get_packet_len(GWBUF* buffer)
|
||||
{
|
||||
// The first three bytes of the packet header contain its length
|
||||
uint8_t buf[3];
|
||||
gwbuf_copy_data(buffer, 0, 3, buf);
|
||||
return gw_mysql_get_byte3(buf) + MYSQL_HEADER_LEN;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Extract PS response values
|
||||
*
|
||||
|
@ -156,7 +156,6 @@ public:
|
||||
enum load_data_state_t
|
||||
{
|
||||
LOAD_DATA_INACTIVE, /**< Not active */
|
||||
LOAD_DATA_START, /**< Current query starts a load */
|
||||
LOAD_DATA_ACTIVE, /**< Load is active */
|
||||
LOAD_DATA_END /**< Current query contains an empty packet that ends the load */
|
||||
};
|
||||
@ -200,6 +199,12 @@ public:
|
||||
|
||||
void set_load_data_state(load_data_state_t state)
|
||||
{
|
||||
if (state == LOAD_DATA_ACTIVE)
|
||||
{
|
||||
ss_dassert(m_load_data_state == LOAD_DATA_INACTIVE);
|
||||
reset_load_data_sent();
|
||||
}
|
||||
|
||||
m_load_data_state = state;
|
||||
}
|
||||
|
||||
|
@ -817,15 +817,6 @@ QueryClassifier::handle_multi_temp_and_load(QueryClassifier::current_target_t cu
|
||||
{
|
||||
append_load_data_sent(querybuf);
|
||||
}
|
||||
else if (is_packet_a_query(packet_type))
|
||||
{
|
||||
qc_query_op_t queryop = qc_get_operation(querybuf);
|
||||
if (queryop == QUERY_OP_LOAD)
|
||||
{
|
||||
set_load_data_state(QueryClassifier::LOAD_DATA_START);
|
||||
reset_load_data_sent();
|
||||
}
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
@ -1582,6 +1582,13 @@ bool mxs_mysql_is_result_set(GWBUF *buffer)
|
||||
return rval;
|
||||
}
|
||||
|
||||
bool mxs_mysql_is_local_infile(GWBUF *buffer)
|
||||
{
|
||||
uint8_t cmd = 0xff; // Default should differ from the OK packet
|
||||
gwbuf_copy_data(buffer, MYSQL_HEADER_LEN, 1, &cmd);
|
||||
return cmd == MYSQL_REPLY_LOCAL_INFILE;
|
||||
}
|
||||
|
||||
bool mxs_mysql_is_prep_stmt_ok(GWBUF *buffer)
|
||||
{
|
||||
bool rval = false;
|
||||
|
@ -13,7 +13,8 @@ RWBackend::RWBackend(SERVER_REF* ref):
|
||||
m_modutil_state({}),
|
||||
m_command(0),
|
||||
m_opening_cursor(false),
|
||||
m_expected_rows(0)
|
||||
m_expected_rows(0),
|
||||
m_local_infile_requested(false)
|
||||
{
|
||||
}
|
||||
|
||||
@ -146,6 +147,8 @@ bool RWBackend::reply_is_complete(GWBUF *buffer)
|
||||
else if (get_reply_state() == REPLY_STATE_START &&
|
||||
(!mxs_mysql_is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer)))
|
||||
{
|
||||
m_local_infile_requested = false;
|
||||
|
||||
if (GWBUF_IS_COLLECTED_RESULT(buffer) ||
|
||||
current_command() == MXS_COM_STMT_PREPARE ||
|
||||
!mxs_mysql_is_ok_packet(buffer) ||
|
||||
@ -153,6 +156,11 @@ bool RWBackend::reply_is_complete(GWBUF *buffer)
|
||||
{
|
||||
/** Not a result set, we have the complete response */
|
||||
set_reply_state(REPLY_STATE_DONE);
|
||||
|
||||
if (mxs_mysql_is_local_infile(buffer))
|
||||
{
|
||||
m_local_infile_requested = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -162,8 +170,12 @@ bool RWBackend::reply_is_complete(GWBUF *buffer)
|
||||
|
||||
if (have_next_packet(buffer))
|
||||
{
|
||||
set_reply_state(REPLY_STATE_RSET_COLDEF);
|
||||
return reply_is_complete(buffer);
|
||||
// TODO: Don't clone the buffer
|
||||
GWBUF* tmp = gwbuf_clone(buffer);
|
||||
tmp = gwbuf_consume(tmp, mxs_mysql_get_packet_len(tmp));
|
||||
bool rval = reply_is_complete(tmp);
|
||||
gwbuf_free(tmp);
|
||||
return rval;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -76,6 +76,11 @@ public:
|
||||
return m_command;
|
||||
}
|
||||
|
||||
bool local_infile_requested() const
|
||||
{
|
||||
return m_local_infile_requested;
|
||||
}
|
||||
|
||||
bool reply_is_complete(GWBUF *buffer);
|
||||
|
||||
private:
|
||||
@ -85,6 +90,7 @@ private:
|
||||
uint8_t m_command;
|
||||
bool m_opening_cursor; /**< Whether we are opening a cursor */
|
||||
uint32_t m_expected_rows; /**< Number of rows a COM_STMT_FETCH is retrieving */
|
||||
bool m_local_infile_requested; /**< Whether a LOCAL INFILE was requested */
|
||||
|
||||
inline bool is_opening_cursor() const
|
||||
{
|
||||
|
@ -983,13 +983,7 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, SRWBackend& target, bool
|
||||
target->set_reply_state(REPLY_STATE_START);
|
||||
m_expected_responses++;
|
||||
|
||||
if (m_qc.load_data_state() == QueryClassifier::LOAD_DATA_START)
|
||||
{
|
||||
/** The first packet contains the actual query and the server
|
||||
* will respond to it */
|
||||
m_qc.set_load_data_state(QueryClassifier::LOAD_DATA_ACTIVE);
|
||||
}
|
||||
else if (m_qc.load_data_state() == QueryClassifier::LOAD_DATA_END)
|
||||
if (m_qc.load_data_state() == QueryClassifier::LOAD_DATA_END)
|
||||
{
|
||||
/** The final packet in a LOAD DATA LOCAL INFILE is an empty packet
|
||||
* to which the server responds with an OK or an ERR packet */
|
||||
|
@ -420,13 +420,6 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
||||
|
||||
SRWBackend& backend = get_backend_from_dcb(backend_dcb);
|
||||
|
||||
if (m_qc.load_data_state() == QueryClassifier::LOAD_DATA_ACTIVE &&
|
||||
mxs_mysql_is_err_packet(writebuf))
|
||||
{
|
||||
// Server responded with an error to the LOAD DATA LOCAL INFILE
|
||||
m_qc.set_load_data_state(QueryClassifier::LOAD_DATA_INACTIVE);
|
||||
}
|
||||
|
||||
if ((writebuf = handle_causal_read_reply(writebuf, backend)) == NULL)
|
||||
{
|
||||
return; // Nothing to route, return
|
||||
@ -475,6 +468,12 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
||||
ss_dassert(m_expected_responses >= 0);
|
||||
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
|
||||
MXS_INFO("Reply complete, last reply from %s", backend->name());
|
||||
|
||||
if (backend->local_infile_requested())
|
||||
{
|
||||
// Server requested a local file, go into data streaming mode
|
||||
m_qc.set_load_data_state(QueryClassifier::LOAD_DATA_ACTIVE);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user