Make variables of Backend private

The variables are no longer directly manipulated by the session level
code.
This commit is contained in:
Markus Mäkelä
2017-04-03 19:28:22 +03:00
parent 0d7f987598
commit 2310381465
3 changed files with 69 additions and 30 deletions

View File

@ -21,9 +21,7 @@ Backend::Backend(SERVER_REF *ref):
m_closed(false),
m_backend(ref),
m_dcb(NULL),
m_map_queue(NULL),
m_mapped(false),
m_num_mapping_eof(0),
m_num_result_wait(0),
m_state(0)
{
@ -37,8 +35,6 @@ Backend::~Backend()
{
close();
}
gwbuf_free(m_map_queue);
}
void Backend::close()
@ -71,9 +67,9 @@ void Backend::close()
}
}
bool Backend::execute_sescmd()
bool Backend::execute_session_command()
{
if (is_closed() || m_session_commands.size() == 0)
if (is_closed() || !session_command_count())
{
return false;
}
@ -82,13 +78,6 @@ bool Backend::execute_sescmd()
int rc = 0;
/** Return if there are no pending ses commands */
if (m_session_commands.size() == 0)
{
MXS_INFO("Cursor had no pending session commands.");
return false;
}
SessionCommandList::iterator iter = m_session_commands.begin();
GWBUF *buffer = iter->copy_buffer().release();
@ -114,6 +103,23 @@ bool Backend::execute_sescmd()
return rc == 1;
}
void Backend::add_session_command(GWBUF* buffer, uint64_t sequence)
{
m_session_commands.push_back(SessionCommand(buffer, sequence));
}
uint64_t Backend::complete_session_command()
{
uint64_t rval = m_session_commands.front().get_position();
m_session_commands.pop_front();
return rval;
}
size_t Backend::session_command_count() const
{
return m_session_commands.size();
}
void Backend::clear_state(enum bref_state state)
{
if (state != BREF_WAITING_RESULT)
@ -213,6 +219,11 @@ bool Backend::is_closed() const
return m_state & BREF_CLOSED;
}
void Backend::set_mapped(bool value)
{
m_mapped = value;
}
bool Backend::is_mapped() const
{
return m_mapped;

View File

@ -127,11 +127,36 @@ public:
~Backend();
/**
* @brief Execute the next session command
* @brief Execute the next session command in the queue
*
* @return True if the command was executed successfully
*/
bool execute_sescmd();
bool execute_session_command();
/**
* @brief Add a new session command to the tail of the command queue
*
* @param buffer Session command to add
* @param sequence Sequence identifier of this session command, returned when
* the session command is completed
*/
void add_session_command(GWBUF* buffer, uint64_t sequence);
/**
* @brief Mark the current session command as successfully executed
*
* This should be called when the response to the command is received
*
* @return The sequence identifier for this session command
*/
uint64_t complete_session_command();
/**
* @brief Check if backend has session commands
*
* @return True if backend has session commands
*/
size_t session_command_count() const;
/**
* @brief Clear state
@ -231,6 +256,13 @@ public:
*/
bool is_closed() const;
/**
* @brief Set the mapping state of the backend
*
* @param value Value to set
*/
void set_mapped(bool value);
/**
* @brief Check if the backend has been mapped
*
@ -242,11 +274,7 @@ private:
bool m_closed; /**< True if a connection has been opened and closed */
SERVER_REF* m_backend; /**< Backend server */
DCB* m_dcb; /**< Backend DCB */
public:
GWBUF* m_map_queue;
bool m_mapped; /**< Whether the backend has been mapped */
int m_num_mapping_eof;
int m_num_result_wait; /**< Number of not yet received results */
Buffer m_pending_cmd; /**< Pending commands */
int m_state; /**< State of the backend */

View File

@ -426,7 +426,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
MXS_INFO("Route query to \t%s:%d <", bref->backend()->server->name, bref->backend()->server->port);
if (bref->m_session_commands.size() > 0)
if (bref->session_command_count())
{
/** Store current statement if execution of the previous
* session command hasn't been completed. */
@ -486,13 +486,14 @@ void SchemaRouterSession::handle_mapping_reply(SBackend& bref, GWBUF** pPacket)
void SchemaRouterSession::process_response(SBackend& bref, GWBUF** ppPacket)
{
if (bref->m_session_commands.size() > 0)
if (bref->session_command_count())
{
/** We are executing a session command */
if (GWBUF_IS_TYPE_SESCMD_RESPONSE((*ppPacket)))
{
if (m_replied_sescmd < m_sent_sescmd &&
bref->m_session_commands.front().get_position() == m_replied_sescmd + 1)
uint64_t id = bref->complete_session_command();
if (m_replied_sescmd < m_sent_sescmd && id == m_replied_sescmd + 1)
{
/** First reply to this session command, route it to the client */
++m_replied_sescmd;
@ -504,8 +505,6 @@ void SchemaRouterSession::process_response(SBackend& bref, GWBUF** ppPacket)
gwbuf_free(*ppPacket);
*ppPacket = NULL;
}
bref->m_session_commands.pop_front();
}
if (*ppPacket)
@ -572,7 +571,7 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
pPacket = NULL;
}
if (bref->execute_sescmd())
if (bref->execute_session_command())
{
MXS_INFO("Backend %s:%d processed reply and starts to execute active cursor.",
bref->backend()->server->name, bref->backend()->server->port);
@ -732,7 +731,8 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
if ((*it)->in_use())
{
GWBUF *buffer = gwbuf_clone(querybuf);
(*it)->m_session_commands.push_back(SessionCommand(buffer, m_sent_sescmd));
(*it)->add_session_command(buffer, m_sent_sescmd);
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{
@ -742,7 +742,7 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
(*it)->backend()->server->port);
}
if ((*it)->m_session_commands.size() == 1)
if ((*it)->session_command_count() == 1)
{
/** Only one command, execute it */
switch (command)
@ -757,7 +757,7 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
break;
}
if ((*it)->execute_sescmd())
if ((*it)->execute_session_command())
{
succp = true;
}
@ -771,7 +771,7 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
}
else
{
ss_dassert((*it)->m_session_commands.size() > 1);
ss_dassert((*it)->session_command_count() > 1);
/** The server is already executing a session command */
MXS_INFO("Backend %s:%d already executing sescmd.",
(*it)->backend()->server->name,