MXS-1506: Add router to MXS_DOWNSTREAM helper function
The function makes it easier to deal with the delayed query routing as well as removing redundancy in the code.
This commit is contained in:
@ -688,4 +688,13 @@ session_dump_statements_t session_get_dump_statements();
|
|||||||
*/
|
*/
|
||||||
bool session_delay_routing(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF* buffer, int seconds);
|
bool session_delay_routing(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF* buffer, int seconds);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cast the session's router as a MXS_DOWNSTREAM object
|
||||||
|
*
|
||||||
|
* @param session The session to use
|
||||||
|
*
|
||||||
|
* @return The router cast as MXS_DOWNSTREAM
|
||||||
|
*/
|
||||||
|
MXS_DOWNSTREAM router_as_downstream(MXS_SESSION* session);
|
||||||
|
|
||||||
MXS_END_DECLS
|
MXS_END_DECLS
|
||||||
|
|||||||
@ -187,10 +187,7 @@ static MXS_SESSION* session_alloc_body(SERVICE* service, DCB* client_dcb,
|
|||||||
// NOTE: the router routeQuery into a filter routeQuery. That
|
// NOTE: the router routeQuery into a filter routeQuery. That
|
||||||
// NOTE: is in order to be able to treat the router as the first
|
// NOTE: is in order to be able to treat the router as the first
|
||||||
// NOTE: filter.
|
// NOTE: filter.
|
||||||
session->head.instance = (MXS_FILTER*)service->router_instance;
|
session->head = router_as_downstream(session);
|
||||||
session->head.session = (MXS_FILTER_SESSION*)session->router_session;
|
|
||||||
session->head.routeQuery =
|
|
||||||
(int32_t (*)(MXS_FILTER*, MXS_FILTER_SESSION*, GWBUF*))service->router->routeQuery;
|
|
||||||
|
|
||||||
// NOTE: Here we cast the session into a MXS_FILTER and MXS_FILTER_SESSION
|
// NOTE: Here we cast the session into a MXS_FILTER and MXS_FILTER_SESSION
|
||||||
// NOTE: and session_reply into a filter clientReply. That's dubious but ok
|
// NOTE: and session_reply into a filter clientReply. That's dubious but ok
|
||||||
@ -1463,3 +1460,12 @@ bool session_delay_routing(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF* buf
|
|||||||
|
|
||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
MXS_DOWNSTREAM router_as_downstream(MXS_SESSION* session)
|
||||||
|
{
|
||||||
|
MXS_DOWNSTREAM head;
|
||||||
|
head.instance = (MXS_FILTER*)session->service->router_instance;
|
||||||
|
head.session = (MXS_FILTER_SESSION*)session->router_session;
|
||||||
|
head.routeQuery = (DOWNSTREAMFUNC)session->service->router->routeQuery;
|
||||||
|
return head;
|
||||||
|
}
|
||||||
|
|||||||
@ -244,21 +244,14 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf, const RouteInfo& info)
|
|||||||
m_retry_duration < m_config.query_retry_timeout &&
|
m_retry_duration < m_config.query_retry_timeout &&
|
||||||
!session_trx_is_active(session))
|
!session_trx_is_active(session))
|
||||||
{
|
{
|
||||||
// This is a more convenient type
|
|
||||||
typedef int32_t (*DOWNSTREAMFUNC)(MXS_FILTER*, MXS_FILTER_SESSION*, GWBUF*);
|
|
||||||
|
|
||||||
MXS_DOWNSTREAM head;
|
|
||||||
head.instance = (MXS_FILTER*)session->service->router_instance;
|
|
||||||
head.session = (MXS_FILTER_SESSION*)session->router_session;
|
|
||||||
head.routeQuery = (DOWNSTREAMFUNC)session->service->router->routeQuery;
|
|
||||||
|
|
||||||
// Try to route the query again later
|
// Try to route the query again later
|
||||||
uint64_t interval = m_config.query_retry_interval;
|
uint64_t interval = m_config.query_retry_interval;
|
||||||
MXS_INFO("Will try to route query again in %lu seconds", interval);
|
session_delay_routing(session, router_as_downstream(session),
|
||||||
session_delay_routing(session, head, gwbuf_clone(querybuf), interval);
|
gwbuf_clone(querybuf), interval);
|
||||||
m_retry_duration += interval;
|
m_retry_duration += interval;
|
||||||
|
|
||||||
succp = true;
|
succp = true;
|
||||||
|
|
||||||
|
MXS_INFO("Will try to route query again in %lu seconds", interval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user