diff --git a/include/maxscale/session.h b/include/maxscale/session.h index 1de29355f..8f1fa18bd 100644 --- a/include/maxscale/session.h +++ b/include/maxscale/session.h @@ -673,4 +673,16 @@ void session_set_dump_statements(session_dump_statements_t value); */ session_dump_statements_t session_get_dump_statements(); +/** + * @brief Route the query again after a delay + * + * @param session The current Session + * @param down The downstream component, either a filter or a router + * @param buffer The buffer to route + * @param seconds Number of seconds to wait before routing the query + * + * @return True if queuing of the query was successful + */ +bool session_delay_routing(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF* buffer, int seconds); + MXS_END_DECLS diff --git a/server/core/session.cc b/server/core/session.cc index dc2d828bd..eab7265ba 100644 --- a/server/core/session.cc +++ b/server/core/session.cc @@ -47,6 +47,7 @@ using std::string; using std::stringstream; +using namespace maxscale; /** Global session id counter. Must be updated atomically. Value 0 is reserved for * dummy/unused sessions. @@ -1378,3 +1379,93 @@ void session_dump_statements(MXS_SESSION* pSession) } } } + +class DelayedRoutingTask: public mxs::WorkerDisposableTask +{ + DelayedRoutingTask(const DelayedRoutingTask&); + DelayedRoutingTask& operator=(const DelayedRoutingTask&); + +public: + DelayedRoutingTask(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF *buffer): + m_session(session_get_ref(session)), + m_down(down), + m_buffer(buffer) + { + } + + ~DelayedRoutingTask() + { + session_put_ref(m_session); + gwbuf_free(m_buffer); + } + + void execute(Worker& worker) + { + if (m_session->state == SESSION_STATE_ROUTER_READY) + { + GWBUF* buffer = m_buffer; + m_buffer = NULL; + + if (m_down.routeQuery(m_down.instance, m_down.session, buffer) == 0) + { + // Routing failed, send a hangup to the client. + poll_fake_hangup_event(m_session->client_dcb); + } + } + } + +private: + MXS_SESSION* m_session; + MXS_DOWNSTREAM m_down; + GWBUF* m_buffer; +}; + +struct TaskAssignment +{ + TaskAssignment(std::auto_ptr task, Worker* worker): + task(task), + worker(worker) + {} + + std::auto_ptr task; + Worker* worker; +}; + +static void delayed_routing_cb(void* data) +{ + TaskAssignment* job = static_cast(data); + job->worker->post(job->task, mxs::Worker::EXECUTE_QUEUED); + delete job; +} + +bool session_delay_routing(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF* buffer, int seconds) +{ + bool success = false; + + try + { + std::stringstream name; + name << "Session_" << session->ses_id << "_retry"; + + Worker* worker = Worker::get_current(); + ss_dassert(worker == Worker::get(session->client_dcb->poll.thread.id)); + std::auto_ptr task(new DelayedRoutingTask(session, down, buffer)); + std::auto_ptr job(new TaskAssignment(task, worker)); + TaskAssignment* pJob = job.release(); + + if (hktask_oneshot(name.str().c_str(), delayed_routing_cb, pJob, seconds)) + { + success = true; + } + else + { + delete pJob; + } + } + catch (std::bad_alloc) + { + MXS_OOM(); + } + + return success; +} diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index a5c9c4071..5c66a2a30 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -477,6 +477,8 @@ MXS_MODULE *MXS_CREATE_MODULE() {"enable_causal_read", MXS_MODULE_PARAM_BOOL, "false"}, {"causal_read_timeout", MXS_MODULE_PARAM_STRING, "0"}, {"master_reconnection", MXS_MODULE_PARAM_BOOL, "false"}, + {"query_retry_timeout", MXS_MODULE_PARAM_COUNT, "10"}, + {"query_retry_interval", MXS_MODULE_PARAM_COUNT, "0"}, {MXS_END_MODULE_PARAMS} } }; diff --git a/server/modules/routing/readwritesplit/readwritesplit.hh b/server/modules/routing/readwritesplit/readwritesplit.hh index f6ee2db7f..08daad02f 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.hh +++ b/server/modules/routing/readwritesplit/readwritesplit.hh @@ -173,7 +173,9 @@ struct Config max_slave_connections(0), enable_causal_read(config_get_bool(params, "enable_causal_read")), causal_read_timeout(config_get_string(params, "causal_read_timeout")), - master_reconnection(config_get_bool(params, "master_reconnection")) + master_reconnection(config_get_bool(params, "master_reconnection")), + query_retry_timeout(config_get_integer(params, "query_retry_timeout")), + query_retry_interval(config_get_integer(params, "query_retry_interval")) { if (enable_causal_read) { @@ -199,8 +201,10 @@ struct Config * each connection*/ int max_slave_connections; /**< Maximum number of slaves for each connection*/ bool enable_causal_read; /**< Enable causual read */ - std::string causal_read_timeout; /**< Timetout, second parameter of function master_wait_gtid */ + std::string causal_read_timeout; /**< Timeout, second parameter of function master_wait_gtid */ bool master_reconnection; /**< Allow changes in master server */ + uint64_t query_retry_timeout; /**< Time window in which a query can be retried */ + uint64_t query_retry_interval; /**< Time window in which a query can be retried */ }; /** diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index c57adee57..aa719e56f 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -205,8 +205,13 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf, const RouteInfo& info) } } + MXS_SESSION* session = m_client->session; + if (succp && target) { + // We have a valid target, reset retry duration + m_retry_duration = 0; + if (!prepare_target(target, route_target)) { // The connection to target was down and we failed to reconnect @@ -235,6 +240,26 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf, const RouteInfo& info) } } } + else if (m_config.query_retry_interval > 0 && + m_retry_duration < m_config.query_retry_timeout && + !session_trx_is_active(session)) + { + // This is a more convenient type + typedef int32_t (*DOWNSTREAMFUNC)(MXS_FILTER*, MXS_FILTER_SESSION*, GWBUF*); + + MXS_DOWNSTREAM head; + head.instance = (MXS_FILTER*)session->service->router_instance; + head.session = (MXS_FILTER_SESSION*)session->router_session; + head.routeQuery = (DOWNSTREAMFUNC)session->service->router->routeQuery; + + // Try to route the query again later + uint64_t interval = m_config.query_retry_interval; + MXS_INFO("Will try to route query again in %lu seconds", interval); + session_delay_routing(session, head, gwbuf_clone(querybuf), interval); + m_retry_duration += interval; + + succp = true; + } } if (succp && m_router->config().connection_keepalive && diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index aff308079..0c4129b11 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -36,7 +36,8 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session, m_gtid_pos(""), m_wait_gtid_state(EXPECTING_NOTHING), m_next_seq(0), - m_qc(this, session, instance->config().use_sql_variables_in) + m_qc(this, session, instance->config().use_sql_variables_in), + m_retry_duration(0) { if (m_config.rw_max_slave_conn_percent) { diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 64cc859cc..edaae5c56 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -124,8 +124,9 @@ public: ExecMap m_exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */ std::string m_gtid_pos; /**< Gtid position for causal read */ wait_gtid_state_t m_wait_gtid_state; /**< Determine boundray of wait gtid result and client query result */ - uint32_t m_next_seq; /**< Next packet'ssequence number */ + uint32_t m_next_seq; /**< Next packet's sequence number */ mxs::QueryClassifier m_qc; /**< The query classifier. */ + uint64_t m_retry_duration; /**< Total time spent retrying queries */ private: RWSplitSession(RWSplit* instance, MXS_SESSION* session,