Refactor Backend class states

The states are now internal to the Backend class. This simplifies the use
of the class by moving the burder of state tracking to the class
itself.

Refactored the way the schemarouter uses the Backend class.

Also fixed a memory leak in the schemarouter when `ignore_databases_regex`
was used..
This commit is contained in:
Markus Mäkelä
2017-06-13 22:36:08 +03:00
parent bbfd9ce136
commit 18993bc8ca
3 changed files with 137 additions and 102 deletions

View File

@ -24,22 +24,31 @@
namespace maxscale
{
/**
* The state of the backend server reference
*/
enum bref_state
{
BREF_IN_USE = 0x01,
BREF_WAITING_RESULT = 0x02, /**< for session commands only */
BREF_QUERY_ACTIVE = 0x04, /**< for other queries */
BREF_CLOSED = 0x08,
};
class Backend
{
Backend(const Backend&);
Backend& operator =(const Backend&);
public:
/**
* How is the backend being closed
*/
enum close_type
{
NORMAL,
FATAL
};
/**
* What type of a response we expect from the backend
*/
enum response_type
{
EXPECT_RESPONSE,
NO_RESPONSE
};
/**
* @brief Create new Backend
*
@ -81,20 +90,6 @@ public:
*/
size_t session_command_count() const;
/**
* @brief Clear state
*
* @param state State to clear
*/
void clear_state(enum bref_state state);
/**
* @brief Set state
*
* @param state State to set
*/
void set_state(enum bref_state state);
/**
* @brief Get pointer to server reference
*
@ -116,7 +111,7 @@ public:
*
* This will close all active connections created by the backend.
*/
void close();
void close(close_type type = NORMAL);
/**
* @brief Get a pointer to the internal DCB
@ -128,11 +123,26 @@ public:
/**
* @brief Write data to the backend server
*
* @param buffer Buffer containing the data to write
* @param buffer Buffer containing the data to write
* @param expect_response Whether to expect a response to the query
*
* @return True if data was written successfully
*/
bool write(GWBUF* buffer);
bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE);
/**
* @brief Write an authentication switch request to the backend server
*
* @param buffer Buffer containing the authentication switch request
*
* @return True if request was successfully written
*/
bool auth(GWBUF* buffer);
/**
* @brief Mark that a reply to a query was received and processed
*/
void ack_write();
/**
* @brief Store a command
@ -165,13 +175,6 @@ public:
*/
bool is_waiting_result() const;
/**
* @brief Check if a query is active
*
* @return True if a query is active
*/
bool is_query_active() const;
/**
* @brief Check if the backend is closed
*
@ -180,10 +183,35 @@ public:
bool is_closed() const;
private:
/**
* Internal state of the backend
*/
enum backend_state
{
IN_USE = 0x01, /**< Backend has been taken into use */
WAITING_RESULT = 0x02, /**< Waiting for a reply */
CLOSED = 0x04, /**< Backend is no longer in use */
FATAL_FAILURE = 0x08 /**< Backend references that should be dropped */
};
/**
* @brief Clear state
*
* @param state State to clear
*/
void clear_state(backend_state state);
/**
* @brief Set state
*
* @param state State to set
*/
void set_state(backend_state state);
bool m_closed; /**< True if a connection has been opened and closed */
SERVER_REF* m_backend; /**< Backend server */
DCB* m_dcb; /**< Backend DCB */
int m_num_result_wait; /**< Number of not yet received results */
mxs::Buffer m_pending_cmd; /**< Pending commands */
int m_state; /**< State of the backend */
SessionCommandList m_session_commands; /**< List of session commands that are

View File

@ -21,7 +21,6 @@ Backend::Backend(SERVER_REF *ref):
m_closed(false),
m_backend(ref),
m_dcb(NULL),
m_num_result_wait(0),
m_state(0)
{
}
@ -36,7 +35,7 @@ Backend::~Backend()
}
}
void Backend::close()
void Backend::close(close_type type)
{
if (!m_closed)
{
@ -47,12 +46,17 @@ void Backend::close()
CHK_DCB(m_dcb);
/** Clean operation counter in bref and in SERVER */
while (is_waiting_result())
if (is_waiting_result())
{
clear_state(BREF_WAITING_RESULT);
clear_state(WAITING_RESULT);
}
clear_state(IN_USE);
set_state(CLOSED);
if (type == FATAL)
{
set_state(FATAL_FAILURE);
}
clear_state(BREF_IN_USE);
set_state(BREF_CLOSED);
dcb_close(m_dcb);
@ -75,18 +79,22 @@ bool Backend::execute_session_command()
CHK_DCB(m_dcb);
int rc = 0;
SessionCommandList::iterator iter = m_session_commands.begin();
SessionCommand& sescmd = *(*iter);
GWBUF *buffer = sescmd.copy_buffer().release();
bool rval = false;
switch (sescmd.get_command())
{
case MYSQL_COM_QUIT:
case MYSQL_COM_STMT_CLOSE:
rval = write(buffer, NO_RESPONSE);
break;
case MYSQL_COM_CHANGE_USER:
/** This makes it possible to handle replies correctly */
gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD);
rc = m_dcb->func.auth(m_dcb, NULL, m_dcb->session, buffer);
rval = auth(buffer);
break;
case MYSQL_COM_QUERY:
@ -96,11 +104,11 @@ bool Backend::execute_session_command()
* MySQL command to protocol
*/
gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD);
rc = m_dcb->func.write(m_dcb, buffer);
rval = write(buffer);
break;
}
return rc == 1;
return rval;
}
void Backend::add_session_command(GWBUF* buffer, uint64_t sequence)
@ -120,32 +128,26 @@ size_t Backend::session_command_count() const
return m_session_commands.size();
}
void Backend::clear_state(enum bref_state state)
void Backend::clear_state(backend_state state)
{
if (state != BREF_WAITING_RESULT)
if ((state & WAITING_RESULT) && (m_state & WAITING_RESULT))
{
m_state &= ~state;
}
else
{
/** Decrease global operation count */
ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, -1);
ss_dassert(prev2 > 0);
}
m_state &= ~state;
}
void Backend::set_state(enum bref_state state)
void Backend::set_state(backend_state state)
{
if (state != BREF_WAITING_RESULT)
if ((state & WAITING_RESULT) && (m_state & WAITING_RESULT) == 0)
{
m_state |= state;
}
else
{
/** Increase global operation count */
ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, 1);
ss_dassert(prev2 >= 0);
}
m_state |= state;
}
SERVER_REF* Backend::backend() const
@ -159,7 +161,7 @@ bool Backend::connect(MXS_SESSION* session)
if ((m_dcb = dcb_connect(m_backend->server, session, m_backend->server->protocol)))
{
m_state = BREF_IN_USE;
m_state = IN_USE;
atomic_add(&m_backend->connections, 1);
rval = true;
}
@ -172,9 +174,35 @@ DCB* Backend::dcb() const
return m_dcb;
}
bool Backend::write(GWBUF* buffer)
bool Backend::write(GWBUF* buffer, response_type type)
{
return m_dcb->func.write(m_dcb, buffer) != 0;
bool rval = m_dcb->func.write(m_dcb, buffer) != 0;
if (rval && type == EXPECT_RESPONSE)
{
set_state(WAITING_RESULT);
}
return rval;
}
bool Backend::auth(GWBUF* buffer)
{
bool rval = false;
if (m_dcb->func.auth(m_dcb, NULL, m_dcb->session, buffer) == 1)
{
set_state(WAITING_RESULT);
rval = true;
}
return rval;
}
void Backend::ack_write()
{
ss_dassert(is_waiting_result());
clear_state(WAITING_RESULT);
}
void Backend::store_command(GWBUF* buffer)
@ -201,20 +229,15 @@ bool Backend::write_stored_command()
bool Backend::in_use() const
{
return m_state & BREF_IN_USE;
return m_state & IN_USE;
}
bool Backend::is_waiting_result() const
{
return m_num_result_wait > 0;
}
bool Backend::is_query_active() const
{
return m_state & BREF_QUERY_ACTIVE;
return m_state & WAITING_RESULT;
}
bool Backend::is_closed() const
{
return m_state & BREF_CLOSED;
return m_state & CLOSED;
}

View File

@ -97,9 +97,13 @@ void SchemaRouterSession::close()
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
SSRBackend& bref = *it;
/** The backends are closed here to trigger the shutdown of
* the connected DCBs */
(*it)->close();
if (bref->in_use())
{
bref->close();
}
}
spinlock_acquire(&m_router->m_lock);
@ -434,20 +438,19 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
pPacket = NULL;
ret = 1;
}
else if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(pPacket))) == 1)
else if (bref->write(pPacket))
{
/** Add one query response waiter to backend reference */
bref->set_state(BREF_QUERY_ACTIVE);
bref->set_state(BREF_WAITING_RESULT);
atomic_add(&m_router->m_stats.n_queries, 1);
ret = 1;
}
else
{
MXS_ERROR("Routing query failed.");
gwbuf_free(pPacket);
}
}
gwbuf_free(pPacket);
return ret;
}
void SchemaRouterSession::handle_mapping_reply(SSRBackend& bref, GWBUF** pPacket)
@ -506,17 +509,6 @@ void SchemaRouterSession::process_sescmd_response(SSRBackend& bref, GWBUF** ppPa
*ppPacket = NULL;
}
}
if (*ppPacket)
{
bref->clear_state(BREF_WAITING_RESULT);
}
}
else if (bref->is_query_active())
{
bref->clear_state(BREF_QUERY_ACTIVE);
/** Set response status as replied */
bref->clear_state(BREF_WAITING_RESULT);
}
}
@ -565,6 +557,10 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
{
process_sescmd_response(bref, &pPacket);
ss_dassert(bref->is_waiting_result());
/** Set response status as replied */
bref->ack_write();
if (pPacket)
{
MXS_SESSION_ROUTE_REPLY(pDcb->session, pPacket);
@ -579,8 +575,6 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
else if (bref->write_stored_command())
{
atomic_add(&m_router->m_stats.n_queries, 1);
bref->set_state(BREF_QUERY_ACTIVE);
bref->set_state(BREF_WAITING_RESULT);
}
}
@ -744,19 +738,6 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
if ((*it)->session_command_count() == 1)
{
/** Only one command, execute it */
switch (command)
{
/** These types of commands don't generate responses */
case MYSQL_COM_QUIT:
case MYSQL_COM_STMT_CLOSE:
break;
default:
(*it)->set_state(BREF_WAITING_RESULT);
break;
}
if ((*it)->execute_session_command())
{
succp = true;
@ -781,6 +762,7 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
}
}
gwbuf_free(querybuf);
return succp;
}
@ -1246,6 +1228,8 @@ bool SchemaRouterSession::ignore_duplicate_database(const char* data)
{
rval = true;
}
pcre2_match_data_free(match_data);
}
return rval;