MXS-1625 Move load data state to QueryClassifier
Still managed from the outside, but eventually will be managed entirelly by QueryClassifier.
This commit is contained in:
@ -34,18 +34,34 @@ public:
|
|||||||
TARGET_ALL = 0x08
|
TARGET_ALL = 0x08
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/** States of a LOAD DATA LOCAL INFILE */
|
||||||
|
enum load_data_state_t
|
||||||
|
{
|
||||||
|
LOAD_DATA_INACTIVE, /**< Not active */
|
||||||
|
LOAD_DATA_START, /**< Current query starts a load */
|
||||||
|
LOAD_DATA_ACTIVE, /**< Load is active */
|
||||||
|
LOAD_DATA_END /**< Current query contains an empty packet that ends the load */
|
||||||
|
};
|
||||||
|
|
||||||
QueryClassifier(MXS_SESSION* pSession,
|
QueryClassifier(MXS_SESSION* pSession,
|
||||||
mxs_target_t use_sql_variables_in);
|
mxs_target_t use_sql_variables_in);
|
||||||
|
|
||||||
void set_load_active(bool active);
|
load_data_state_t load_data_state() const
|
||||||
bool load_active() const;
|
{
|
||||||
|
return m_load_data_state;
|
||||||
|
}
|
||||||
|
|
||||||
|
void set_load_data_state(load_data_state_t state)
|
||||||
|
{
|
||||||
|
m_load_data_state = state;
|
||||||
|
}
|
||||||
|
|
||||||
uint32_t get_route_target(uint8_t command, uint32_t qtype);
|
uint32_t get_route_target(uint8_t command, uint32_t qtype);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
MXS_SESSION* m_pSession;
|
MXS_SESSION* m_pSession;
|
||||||
mxs_target_t m_use_sql_variables_in;
|
mxs_target_t m_use_sql_variables_in;
|
||||||
bool m_load_active;
|
load_data_state_t m_load_data_state;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -22,24 +22,15 @@ QueryClassifier::QueryClassifier(MXS_SESSION* pSession,
|
|||||||
mxs_target_t use_sql_variables_in)
|
mxs_target_t use_sql_variables_in)
|
||||||
: m_pSession(pSession)
|
: m_pSession(pSession)
|
||||||
, m_use_sql_variables_in(use_sql_variables_in)
|
, m_use_sql_variables_in(use_sql_variables_in)
|
||||||
, m_load_active(false)
|
, m_load_data_state(LOAD_DATA_INACTIVE)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueryClassifier::set_load_active(bool active)
|
|
||||||
{
|
|
||||||
m_load_active = active;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool QueryClassifier::load_active() const
|
|
||||||
{
|
|
||||||
return m_load_active;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t QueryClassifier::get_route_target(uint8_t command, uint32_t qtype)
|
uint32_t QueryClassifier::get_route_target(uint8_t command, uint32_t qtype)
|
||||||
{
|
{
|
||||||
bool trx_active = session_trx_is_active(m_pSession);
|
bool trx_active = session_trx_is_active(m_pSession);
|
||||||
uint32_t target = TARGET_UNDEFINED;
|
uint32_t target = TARGET_UNDEFINED;
|
||||||
|
bool load_active = (m_load_data_state != LOAD_DATA_INACTIVE);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Prepared statements preparations should go to all servers
|
* Prepared statements preparations should go to all servers
|
||||||
@ -54,7 +45,7 @@ uint32_t QueryClassifier::get_route_target(uint8_t command, uint32_t qtype)
|
|||||||
/**
|
/**
|
||||||
* These queries should be routed to all servers
|
* These queries should be routed to all servers
|
||||||
*/
|
*/
|
||||||
else if (!m_load_active &&
|
else if (!load_active &&
|
||||||
(qc_query_is_type(qtype, QUERY_TYPE_SESSION_WRITE) ||
|
(qc_query_is_type(qtype, QUERY_TYPE_SESSION_WRITE) ||
|
||||||
/** Configured to allow writing user variables to all nodes */
|
/** Configured to allow writing user variables to all nodes */
|
||||||
(m_use_sql_variables_in == TYPE_ALL &&
|
(m_use_sql_variables_in == TYPE_ALL &&
|
||||||
@ -96,7 +87,7 @@ uint32_t QueryClassifier::get_route_target(uint8_t command, uint32_t qtype)
|
|||||||
/**
|
/**
|
||||||
* Hints may affect on routing of the following queries
|
* Hints may affect on routing of the following queries
|
||||||
*/
|
*/
|
||||||
else if (!trx_active && !m_load_active &&
|
else if (!trx_active && !load_active &&
|
||||||
!qc_query_is_type(qtype, QUERY_TYPE_MASTER_READ) &&
|
!qc_query_is_type(qtype, QUERY_TYPE_MASTER_READ) &&
|
||||||
!qc_query_is_type(qtype, QUERY_TYPE_WRITE) &&
|
!qc_query_is_type(qtype, QUERY_TYPE_WRITE) &&
|
||||||
(qc_query_is_type(qtype, QUERY_TYPE_READ) ||
|
(qc_query_is_type(qtype, QUERY_TYPE_READ) ||
|
||||||
@ -133,7 +124,7 @@ uint32_t QueryClassifier::get_route_target(uint8_t command, uint32_t qtype)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ss_dassert(trx_active || m_load_active ||
|
ss_dassert(trx_active || load_active ||
|
||||||
(qc_query_is_type(qtype, QUERY_TYPE_WRITE) ||
|
(qc_query_is_type(qtype, QUERY_TYPE_WRITE) ||
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_MASTER_READ) ||
|
qc_query_is_type(qtype, QUERY_TYPE_MASTER_READ) ||
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_SESSION_WRITE) ||
|
qc_query_is_type(qtype, QUERY_TYPE_SESSION_WRITE) ||
|
||||||
|
@ -305,6 +305,88 @@ RWSplitSession* RWSplit::newSession(MXS_SESSION *session)
|
|||||||
return rses;
|
return rses;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
<<<<<<< 563fa2c840c70eaf98fc61289ab28db3aaf49932
|
||||||
|
=======
|
||||||
|
/**
|
||||||
|
* @brief Close a router session
|
||||||
|
*
|
||||||
|
* Close a session with the router, this is the mechanism by which a router
|
||||||
|
* may perform cleanup. The instance of the router that relates to
|
||||||
|
* the relevant service is passed, along with the router session that is to
|
||||||
|
* be closed. The freeSession will be called once the session has been closed.
|
||||||
|
*
|
||||||
|
* @param instance The router instance data
|
||||||
|
* @param session The router session being closed
|
||||||
|
*/
|
||||||
|
void RWSplitSession::close()
|
||||||
|
{
|
||||||
|
close_all_connections(backends);
|
||||||
|
|
||||||
|
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO) &&
|
||||||
|
sescmd_list.size())
|
||||||
|
{
|
||||||
|
std::string sescmdstr;
|
||||||
|
|
||||||
|
for (mxs::SessionCommandList::iterator it = sescmd_list.begin();
|
||||||
|
it != sescmd_list.end(); it++)
|
||||||
|
{
|
||||||
|
mxs::SSessionCommand& scmd = *it;
|
||||||
|
sescmdstr += scmd->to_string();
|
||||||
|
sescmdstr += "\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
MXS_INFO("Executed session commands:\n%s", sescmdstr.c_str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
|
||||||
|
{
|
||||||
|
int rval = 0;
|
||||||
|
|
||||||
|
if (query_queue == NULL &&
|
||||||
|
(expected_responses == 0 ||
|
||||||
|
mxs_mysql_get_command(querybuf) == MXS_COM_STMT_FETCH ||
|
||||||
|
m_qc.load_data_state() == QueryClassifier::LOAD_DATA_ACTIVE ||
|
||||||
|
large_query))
|
||||||
|
{
|
||||||
|
/** Gather the information required to make routing decisions */
|
||||||
|
RouteInfo info(this, querybuf);
|
||||||
|
|
||||||
|
/** No active or pending queries */
|
||||||
|
if (route_single_stmt(querybuf, info))
|
||||||
|
{
|
||||||
|
rval = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* We are already processing a request from the client. Store the
|
||||||
|
* new query and wait for the previous one to complete.
|
||||||
|
*/
|
||||||
|
ss_dassert(expected_responses || query_queue);
|
||||||
|
MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command",
|
||||||
|
gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], expected_responses);
|
||||||
|
query_queue = gwbuf_append(query_queue, querybuf);
|
||||||
|
querybuf = NULL;
|
||||||
|
rval = 1;
|
||||||
|
ss_dassert(expected_responses > 0);
|
||||||
|
|
||||||
|
if (expected_responses == 0 && !route_stored_query())
|
||||||
|
{
|
||||||
|
rval = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (querybuf != NULL)
|
||||||
|
{
|
||||||
|
gwbuf_free(querybuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
>>>>>>> MXS-1625 Move load data state to QueryClassifier
|
||||||
void RWSplit::diagnostics(DCB *dcb)
|
void RWSplit::diagnostics(DCB *dcb)
|
||||||
{
|
{
|
||||||
RWSplit *router = this;
|
RWSplit *router = this;
|
||||||
|
@ -113,15 +113,6 @@ static const MXS_ENUM_VALUE master_failure_mode_values[] =
|
|||||||
{NULL}
|
{NULL}
|
||||||
};
|
};
|
||||||
|
|
||||||
/** States of a LOAD DATA LOCAL INFILE */
|
|
||||||
enum ld_state
|
|
||||||
{
|
|
||||||
LOAD_DATA_INACTIVE, /**< Not active */
|
|
||||||
LOAD_DATA_START, /**< Current query starts a load */
|
|
||||||
LOAD_DATA_ACTIVE, /**< Load is active */
|
|
||||||
LOAD_DATA_END /**< Current query contains an empty packet that ends the load */
|
|
||||||
};
|
|
||||||
|
|
||||||
#define TARGET_IS_MASTER(t) (t & TARGET_MASTER)
|
#define TARGET_IS_MASTER(t) (t & TARGET_MASTER)
|
||||||
#define TARGET_IS_SLAVE(t) (t & TARGET_SLAVE)
|
#define TARGET_IS_SLAVE(t) (t & TARGET_SLAVE)
|
||||||
#define TARGET_IS_NAMED_SERVER(t) (t & TARGET_NAMED_SERVER)
|
#define TARGET_IS_NAMED_SERVER(t) (t & TARGET_NAMED_SERVER)
|
||||||
|
@ -18,6 +18,8 @@
|
|||||||
|
|
||||||
#define RWSPLIT_TRACE_MSG_LEN 1000
|
#define RWSPLIT_TRACE_MSG_LEN 1000
|
||||||
|
|
||||||
|
using mxs::QueryClassifier;
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -26,7 +28,6 @@ namespace
|
|||||||
* target for query routing.
|
* target for query routing.
|
||||||
*
|
*
|
||||||
* @param qc The query classifier.
|
* @param qc The query classifier.
|
||||||
* @param load_active Whether LOAD DATA is on going.
|
|
||||||
* @param command The current command.
|
* @param command The current command.
|
||||||
* @param qtype Type of query
|
* @param qtype Type of query
|
||||||
* @param query_hints Pointer to list of hints attached to the query buffer
|
* @param query_hints Pointer to list of hints attached to the query buffer
|
||||||
@ -35,13 +36,10 @@ namespace
|
|||||||
* if the query would otherwise be routed to slave.
|
* if the query would otherwise be routed to slave.
|
||||||
*/
|
*/
|
||||||
route_target_t get_route_target(mxs::QueryClassifier& qc,
|
route_target_t get_route_target(mxs::QueryClassifier& qc,
|
||||||
bool load_active,
|
|
||||||
uint8_t command,
|
uint8_t command,
|
||||||
uint32_t qtype,
|
uint32_t qtype,
|
||||||
HINT *query_hints)
|
HINT *query_hints)
|
||||||
{
|
{
|
||||||
qc.set_load_active(load_active);
|
|
||||||
|
|
||||||
int target = qc.get_route_target(command, qtype);
|
int target = qc.get_route_target(command, qtype);
|
||||||
|
|
||||||
/** Process routing hints */
|
/** Process routing hints */
|
||||||
@ -114,7 +112,7 @@ log_transaction_status(RWSplitSession *rses, GWBUF *querybuf, uint32_t qtype)
|
|||||||
{
|
{
|
||||||
MXS_INFO("> Processing large request with more than 2^24 bytes of data");
|
MXS_INFO("> Processing large request with more than 2^24 bytes of data");
|
||||||
}
|
}
|
||||||
else if (rses->m_load_data_state == LOAD_DATA_INACTIVE)
|
else if (rses->qc().load_data_state() == QueryClassifier::LOAD_DATA_INACTIVE)
|
||||||
{
|
{
|
||||||
uint8_t *packet = GWBUF_DATA(querybuf);
|
uint8_t *packet = GWBUF_DATA(querybuf);
|
||||||
unsigned char command = packet[4];
|
unsigned char command = packet[4];
|
||||||
@ -522,7 +520,7 @@ handle_multi_temp_and_load(RWSplitSession *rses, GWBUF *querybuf,
|
|||||||
* Check if this is a LOAD DATA LOCAL INFILE query. If so, send all queries
|
* Check if this is a LOAD DATA LOCAL INFILE query. If so, send all queries
|
||||||
* to the master until the last, empty packet arrives.
|
* to the master until the last, empty packet arrives.
|
||||||
*/
|
*/
|
||||||
if (rses->m_load_data_state == LOAD_DATA_ACTIVE)
|
if (rses->qc().load_data_state() == QueryClassifier::LOAD_DATA_ACTIVE)
|
||||||
{
|
{
|
||||||
rses->m_load_data_sent += gwbuf_length(querybuf);
|
rses->m_load_data_sent += gwbuf_length(querybuf);
|
||||||
}
|
}
|
||||||
@ -531,8 +529,8 @@ handle_multi_temp_and_load(RWSplitSession *rses, GWBUF *querybuf,
|
|||||||
qc_query_op_t queryop = qc_get_operation(querybuf);
|
qc_query_op_t queryop = qc_get_operation(querybuf);
|
||||||
if (queryop == QUERY_OP_LOAD)
|
if (queryop == QUERY_OP_LOAD)
|
||||||
{
|
{
|
||||||
rses->m_load_data_state = LOAD_DATA_START;
|
rses->qc().set_load_data_state(QueryClassifier::LOAD_DATA_START);
|
||||||
rses->m_load_data_sent = 0;
|
rses->rses_load_data_sent = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -621,17 +619,13 @@ route_target_t get_target_type(RWSplitSession *rses, GWBUF *buffer,
|
|||||||
*type = rses->m_ps_manager.get_type(*stmt_id);
|
*type = rses->m_ps_manager.get_type(*stmt_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool load_active = (rses->load_data_state != LOAD_DATA_INACTIVE);
|
route_target = get_route_target(rses->qc(), *command, *type, buffer->hint);
|
||||||
|
|
||||||
route_target = get_route_target(rses->qc(),
|
|
||||||
load_active,
|
|
||||||
*command, *type, buffer->hint);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/** Empty packet signals end of LOAD DATA LOCAL INFILE, send it to master*/
|
/** Empty packet signals end of LOAD DATA LOCAL INFILE, send it to master*/
|
||||||
rses->m_load_data_state = LOAD_DATA_END;
|
rses->qc().set_load_data_state(QueryClassifier::LOAD_DATA_END);
|
||||||
MXS_INFO("> LOAD DATA LOCAL INFILE finished: %lu bytes sent.",
|
MXS_INFO("> LOAD DATA LOCAL INFILE finished: %lu bytes sent.",
|
||||||
rses->m_load_data_sent + gwbuf_length(buffer));
|
rses->m_load_data_sent + gwbuf_length(buffer));
|
||||||
}
|
}
|
||||||
|
@ -936,7 +936,7 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, SRWBackend& target, bool
|
|||||||
m_wait_gtid_state = EXPECTING_WAIT_GTID_RESULT;
|
m_wait_gtid_state = EXPECTING_WAIT_GTID_RESULT;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (m_load_data_state != LOAD_DATA_ACTIVE &&
|
if (m_qc.load_data_state() != QueryClassifier::LOAD_DATA_ACTIVE &&
|
||||||
mxs_mysql_command_will_respond(cmd))
|
mxs_mysql_command_will_respond(cmd))
|
||||||
{
|
{
|
||||||
response = mxs::Backend::EXPECT_RESPONSE;
|
response = mxs::Backend::EXPECT_RESPONSE;
|
||||||
@ -960,18 +960,18 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, SRWBackend& target, bool
|
|||||||
target->set_reply_state(REPLY_STATE_START);
|
target->set_reply_state(REPLY_STATE_START);
|
||||||
m_expected_responses++;
|
m_expected_responses++;
|
||||||
|
|
||||||
if (m_load_data_state == LOAD_DATA_START)
|
if (m_qc.load_data_state() == QueryClassifier::LOAD_DATA_START)
|
||||||
{
|
{
|
||||||
/** The first packet contains the actual query and the server
|
/** The first packet contains the actual query and the server
|
||||||
* will respond to it */
|
* will respond to it */
|
||||||
m_load_data_state = LOAD_DATA_ACTIVE;
|
m_qc.set_load_data_state(QueryClassifier::LOAD_DATA_ACTIVE);
|
||||||
}
|
}
|
||||||
else if (m_load_data_state == LOAD_DATA_END)
|
else if (m_qc.load_data_state() == QueryClassifier::LOAD_DATA_END)
|
||||||
{
|
{
|
||||||
/** The final packet in a LOAD DATA LOCAL INFILE is an empty packet
|
/** The final packet in a LOAD DATA LOCAL INFILE is an empty packet
|
||||||
* to which the server responds with an OK or an ERR packet */
|
* to which the server responds with an OK or an ERR packet */
|
||||||
ss_dassert(gwbuf_length(querybuf) == 4);
|
ss_dassert(gwbuf_length(querybuf) == 4);
|
||||||
m_load_data_state = LOAD_DATA_INACTIVE;
|
m_qc.set_load_data_state(QueryClassifier::LOAD_DATA_INACTIVE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,7 +27,6 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
|||||||
m_large_query(false),
|
m_large_query(false),
|
||||||
m_config(instance->config()),
|
m_config(instance->config()),
|
||||||
m_nbackends(instance->service()->n_dbref),
|
m_nbackends(instance->service()->n_dbref),
|
||||||
m_load_data_state(LOAD_DATA_INACTIVE),
|
|
||||||
m_have_tmp_tables(false),
|
m_have_tmp_tables(false),
|
||||||
m_load_data_sent(0),
|
m_load_data_sent(0),
|
||||||
m_client(session->client_dcb),
|
m_client(session->client_dcb),
|
||||||
|
@ -107,7 +107,6 @@ public:
|
|||||||
bool m_large_query; /**< Set to true when processing payloads >= 2^24 bytes */
|
bool m_large_query; /**< Set to true when processing payloads >= 2^24 bytes */
|
||||||
Config m_config; /**< copied config info from router instance */
|
Config m_config; /**< copied config info from router instance */
|
||||||
int m_nbackends; /**< Number of backend servers (obsolete) */
|
int m_nbackends; /**< Number of backend servers (obsolete) */
|
||||||
enum ld_state m_load_data_state; /**< Current load data state */
|
|
||||||
bool m_have_tmp_tables; /**< True if temp tables have been created */
|
bool m_have_tmp_tables; /**< True if temp tables have been created */
|
||||||
uint64_t m_load_data_sent; /**< How much data has been sent */
|
uint64_t m_load_data_sent; /**< How much data has been sent */
|
||||||
DCB* m_client; /**< The client DCB */
|
DCB* m_client; /**< The client DCB */
|
||||||
|
Reference in New Issue
Block a user