Add delayed query retry prototype
This is a proof-of-concept that validates the query retrying method. The actual implementation of the query retrying mechanism needs more thought as using the housekeeper is not very efficient.
This commit is contained in:
@ -673,4 +673,16 @@ void session_set_dump_statements(session_dump_statements_t value);
|
|||||||
*/
|
*/
|
||||||
session_dump_statements_t session_get_dump_statements();
|
session_dump_statements_t session_get_dump_statements();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Route the query again after a delay
|
||||||
|
*
|
||||||
|
* @param session The current Session
|
||||||
|
* @param down The downstream component, either a filter or a router
|
||||||
|
* @param buffer The buffer to route
|
||||||
|
* @param seconds Number of seconds to wait before routing the query
|
||||||
|
*
|
||||||
|
* @return True if queuing of the query was successful
|
||||||
|
*/
|
||||||
|
bool session_delay_routing(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF* buffer, int seconds);
|
||||||
|
|
||||||
MXS_END_DECLS
|
MXS_END_DECLS
|
||||||
|
@ -47,6 +47,7 @@
|
|||||||
|
|
||||||
using std::string;
|
using std::string;
|
||||||
using std::stringstream;
|
using std::stringstream;
|
||||||
|
using namespace maxscale;
|
||||||
|
|
||||||
/** Global session id counter. Must be updated atomically. Value 0 is reserved for
|
/** Global session id counter. Must be updated atomically. Value 0 is reserved for
|
||||||
* dummy/unused sessions.
|
* dummy/unused sessions.
|
||||||
@ -1378,3 +1379,93 @@ void session_dump_statements(MXS_SESSION* pSession)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class DelayedRoutingTask: public mxs::WorkerDisposableTask
|
||||||
|
{
|
||||||
|
DelayedRoutingTask(const DelayedRoutingTask&);
|
||||||
|
DelayedRoutingTask& operator=(const DelayedRoutingTask&);
|
||||||
|
|
||||||
|
public:
|
||||||
|
DelayedRoutingTask(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF *buffer):
|
||||||
|
m_session(session_get_ref(session)),
|
||||||
|
m_down(down),
|
||||||
|
m_buffer(buffer)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
~DelayedRoutingTask()
|
||||||
|
{
|
||||||
|
session_put_ref(m_session);
|
||||||
|
gwbuf_free(m_buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
void execute(Worker& worker)
|
||||||
|
{
|
||||||
|
if (m_session->state == SESSION_STATE_ROUTER_READY)
|
||||||
|
{
|
||||||
|
GWBUF* buffer = m_buffer;
|
||||||
|
m_buffer = NULL;
|
||||||
|
|
||||||
|
if (m_down.routeQuery(m_down.instance, m_down.session, buffer) == 0)
|
||||||
|
{
|
||||||
|
// Routing failed, send a hangup to the client.
|
||||||
|
poll_fake_hangup_event(m_session->client_dcb);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
MXS_SESSION* m_session;
|
||||||
|
MXS_DOWNSTREAM m_down;
|
||||||
|
GWBUF* m_buffer;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct TaskAssignment
|
||||||
|
{
|
||||||
|
TaskAssignment(std::auto_ptr<DelayedRoutingTask> task, Worker* worker):
|
||||||
|
task(task),
|
||||||
|
worker(worker)
|
||||||
|
{}
|
||||||
|
|
||||||
|
std::auto_ptr<DelayedRoutingTask> task;
|
||||||
|
Worker* worker;
|
||||||
|
};
|
||||||
|
|
||||||
|
static void delayed_routing_cb(void* data)
|
||||||
|
{
|
||||||
|
TaskAssignment* job = static_cast<TaskAssignment*>(data);
|
||||||
|
job->worker->post(job->task, mxs::Worker::EXECUTE_QUEUED);
|
||||||
|
delete job;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool session_delay_routing(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF* buffer, int seconds)
|
||||||
|
{
|
||||||
|
bool success = false;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
std::stringstream name;
|
||||||
|
name << "Session_" << session->ses_id << "_retry";
|
||||||
|
|
||||||
|
Worker* worker = Worker::get_current();
|
||||||
|
ss_dassert(worker == Worker::get(session->client_dcb->poll.thread.id));
|
||||||
|
std::auto_ptr<DelayedRoutingTask> task(new DelayedRoutingTask(session, down, buffer));
|
||||||
|
std::auto_ptr<TaskAssignment> job(new TaskAssignment(task, worker));
|
||||||
|
TaskAssignment* pJob = job.release();
|
||||||
|
|
||||||
|
if (hktask_oneshot(name.str().c_str(), delayed_routing_cb, pJob, seconds))
|
||||||
|
{
|
||||||
|
success = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
delete pJob;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (std::bad_alloc)
|
||||||
|
{
|
||||||
|
MXS_OOM();
|
||||||
|
}
|
||||||
|
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
@ -477,6 +477,8 @@ MXS_MODULE *MXS_CREATE_MODULE()
|
|||||||
{"enable_causal_read", MXS_MODULE_PARAM_BOOL, "false"},
|
{"enable_causal_read", MXS_MODULE_PARAM_BOOL, "false"},
|
||||||
{"causal_read_timeout", MXS_MODULE_PARAM_STRING, "0"},
|
{"causal_read_timeout", MXS_MODULE_PARAM_STRING, "0"},
|
||||||
{"master_reconnection", MXS_MODULE_PARAM_BOOL, "false"},
|
{"master_reconnection", MXS_MODULE_PARAM_BOOL, "false"},
|
||||||
|
{"query_retry_timeout", MXS_MODULE_PARAM_COUNT, "10"},
|
||||||
|
{"query_retry_interval", MXS_MODULE_PARAM_COUNT, "0"},
|
||||||
{MXS_END_MODULE_PARAMS}
|
{MXS_END_MODULE_PARAMS}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -173,7 +173,9 @@ struct Config
|
|||||||
max_slave_connections(0),
|
max_slave_connections(0),
|
||||||
enable_causal_read(config_get_bool(params, "enable_causal_read")),
|
enable_causal_read(config_get_bool(params, "enable_causal_read")),
|
||||||
causal_read_timeout(config_get_string(params, "causal_read_timeout")),
|
causal_read_timeout(config_get_string(params, "causal_read_timeout")),
|
||||||
master_reconnection(config_get_bool(params, "master_reconnection"))
|
master_reconnection(config_get_bool(params, "master_reconnection")),
|
||||||
|
query_retry_timeout(config_get_integer(params, "query_retry_timeout")),
|
||||||
|
query_retry_interval(config_get_integer(params, "query_retry_interval"))
|
||||||
{
|
{
|
||||||
if (enable_causal_read)
|
if (enable_causal_read)
|
||||||
{
|
{
|
||||||
@ -199,8 +201,10 @@ struct Config
|
|||||||
* each connection*/
|
* each connection*/
|
||||||
int max_slave_connections; /**< Maximum number of slaves for each connection*/
|
int max_slave_connections; /**< Maximum number of slaves for each connection*/
|
||||||
bool enable_causal_read; /**< Enable causual read */
|
bool enable_causal_read; /**< Enable causual read */
|
||||||
std::string causal_read_timeout; /**< Timetout, second parameter of function master_wait_gtid */
|
std::string causal_read_timeout; /**< Timeout, second parameter of function master_wait_gtid */
|
||||||
bool master_reconnection; /**< Allow changes in master server */
|
bool master_reconnection; /**< Allow changes in master server */
|
||||||
|
uint64_t query_retry_timeout; /**< Time window in which a query can be retried */
|
||||||
|
uint64_t query_retry_interval; /**< Time window in which a query can be retried */
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -205,8 +205,13 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf, const RouteInfo& info)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
MXS_SESSION* session = m_client->session;
|
||||||
|
|
||||||
if (succp && target)
|
if (succp && target)
|
||||||
{
|
{
|
||||||
|
// We have a valid target, reset retry duration
|
||||||
|
m_retry_duration = 0;
|
||||||
|
|
||||||
if (!prepare_target(target, route_target))
|
if (!prepare_target(target, route_target))
|
||||||
{
|
{
|
||||||
// The connection to target was down and we failed to reconnect
|
// The connection to target was down and we failed to reconnect
|
||||||
@ -235,6 +240,26 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf, const RouteInfo& info)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else if (m_config.query_retry_interval > 0 &&
|
||||||
|
m_retry_duration < m_config.query_retry_timeout &&
|
||||||
|
!session_trx_is_active(session))
|
||||||
|
{
|
||||||
|
// This is a more convenient type
|
||||||
|
typedef int32_t (*DOWNSTREAMFUNC)(MXS_FILTER*, MXS_FILTER_SESSION*, GWBUF*);
|
||||||
|
|
||||||
|
MXS_DOWNSTREAM head;
|
||||||
|
head.instance = (MXS_FILTER*)session->service->router_instance;
|
||||||
|
head.session = (MXS_FILTER_SESSION*)session->router_session;
|
||||||
|
head.routeQuery = (DOWNSTREAMFUNC)session->service->router->routeQuery;
|
||||||
|
|
||||||
|
// Try to route the query again later
|
||||||
|
uint64_t interval = m_config.query_retry_interval;
|
||||||
|
MXS_INFO("Will try to route query again in %lu seconds", interval);
|
||||||
|
session_delay_routing(session, head, gwbuf_clone(querybuf), interval);
|
||||||
|
m_retry_duration += interval;
|
||||||
|
|
||||||
|
succp = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (succp && m_router->config().connection_keepalive &&
|
if (succp && m_router->config().connection_keepalive &&
|
||||||
|
@ -36,7 +36,8 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
|||||||
m_gtid_pos(""),
|
m_gtid_pos(""),
|
||||||
m_wait_gtid_state(EXPECTING_NOTHING),
|
m_wait_gtid_state(EXPECTING_NOTHING),
|
||||||
m_next_seq(0),
|
m_next_seq(0),
|
||||||
m_qc(this, session, instance->config().use_sql_variables_in)
|
m_qc(this, session, instance->config().use_sql_variables_in),
|
||||||
|
m_retry_duration(0)
|
||||||
{
|
{
|
||||||
if (m_config.rw_max_slave_conn_percent)
|
if (m_config.rw_max_slave_conn_percent)
|
||||||
{
|
{
|
||||||
|
@ -124,8 +124,9 @@ public:
|
|||||||
ExecMap m_exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */
|
ExecMap m_exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */
|
||||||
std::string m_gtid_pos; /**< Gtid position for causal read */
|
std::string m_gtid_pos; /**< Gtid position for causal read */
|
||||||
wait_gtid_state_t m_wait_gtid_state; /**< Determine boundray of wait gtid result and client query result */
|
wait_gtid_state_t m_wait_gtid_state; /**< Determine boundray of wait gtid result and client query result */
|
||||||
uint32_t m_next_seq; /**< Next packet'ssequence number */
|
uint32_t m_next_seq; /**< Next packet's sequence number */
|
||||||
mxs::QueryClassifier m_qc; /**< The query classifier. */
|
mxs::QueryClassifier m_qc; /**< The query classifier. */
|
||||||
|
uint64_t m_retry_duration; /**< Total time spent retrying queries */
|
||||||
|
|
||||||
private:
|
private:
|
||||||
RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
||||||
|
Reference in New Issue
Block a user