
As the session commands are always appended to the end of the list, the name should reflect that action. For this reason, the function was renamed to append_session_command. Readwritesplit supports replacement of slave servers by storing all executed session commands in a list. To make the copying of this list a bit cleaner, an overload for a list of session commands was added. This will allow relatively smooth addition of server replacement to all router modules that use the Backend class.
286 lines
5.6 KiB
C++
286 lines
5.6 KiB
C++
/*
|
|
* Copyright (c) 2016 MariaDB Corporation Ab
|
|
*
|
|
* Use of this software is governed by the Business Source License included
|
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
|
*
|
|
* Change Date: 2020-01-01
|
|
*
|
|
* On the date above, in accordance with the Business Source License, use
|
|
* of this software will be governed by version 2 or later of the General
|
|
* Public License.
|
|
*/
|
|
|
|
#include <maxscale/backend.hh>
|
|
#include <maxscale/protocol/mysql.h>
|
|
#include <maxscale/debug.h>
|
|
|
|
using namespace maxscale;
|
|
|
|
Backend::Backend(SERVER_REF *ref):
|
|
m_closed(false),
|
|
m_backend(ref),
|
|
m_dcb(NULL),
|
|
m_state(0)
|
|
{
|
|
}
|
|
|
|
Backend::~Backend()
|
|
{
|
|
ss_dassert(m_closed);
|
|
|
|
if (!m_closed)
|
|
{
|
|
close();
|
|
}
|
|
}
|
|
|
|
void Backend::close(close_type type)
|
|
{
|
|
if (!m_closed)
|
|
{
|
|
m_closed = true;
|
|
|
|
if (in_use())
|
|
{
|
|
CHK_DCB(m_dcb);
|
|
|
|
/** Clean operation counter in bref and in SERVER */
|
|
if (is_waiting_result())
|
|
{
|
|
clear_state(WAITING_RESULT);
|
|
}
|
|
clear_state(IN_USE);
|
|
set_state(CLOSED);
|
|
|
|
if (type == CLOSE_FATAL)
|
|
{
|
|
set_state(FATAL_FAILURE);
|
|
}
|
|
|
|
dcb_close(m_dcb);
|
|
|
|
/** decrease server current connection counters */
|
|
atomic_add(&m_backend->connections, -1);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
ss_dassert(false);
|
|
}
|
|
}
|
|
|
|
bool Backend::execute_session_command()
|
|
{
|
|
if (is_closed() || !session_command_count())
|
|
{
|
|
return false;
|
|
}
|
|
|
|
CHK_DCB(m_dcb);
|
|
|
|
SessionCommandList::iterator iter = m_session_commands.begin();
|
|
SessionCommand& sescmd = *(*iter);
|
|
GWBUF *buffer = sescmd.copy_buffer().release();
|
|
bool rval = false;
|
|
|
|
switch (sescmd.get_command())
|
|
{
|
|
case MYSQL_COM_QUIT:
|
|
case MYSQL_COM_STMT_CLOSE:
|
|
rval = write(buffer, NO_RESPONSE);
|
|
break;
|
|
|
|
case MYSQL_COM_CHANGE_USER:
|
|
/** This makes it possible to handle replies correctly */
|
|
gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD);
|
|
rval = auth(buffer);
|
|
break;
|
|
|
|
case MYSQL_COM_QUERY:
|
|
default:
|
|
/**
|
|
* Mark session command buffer, it triggers writing
|
|
* MySQL command to protocol
|
|
*/
|
|
gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD);
|
|
rval = write(buffer);
|
|
break;
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
void Backend::append_session_command(GWBUF* buffer, uint64_t sequence)
|
|
{
|
|
m_session_commands.push_back(SSessionCommand(new SessionCommand(buffer, sequence)));
|
|
}
|
|
|
|
void Backend::append_session_command(const SSessionCommand& sescmd)
|
|
{
|
|
m_session_commands.push_back(sescmd);
|
|
}
|
|
|
|
void Backend::append_session_command(const SessionCommandList& sescmdlist)
|
|
{
|
|
m_session_commands.insert(m_session_commands.end(), sescmdlist.begin(), sescmdlist.end());
|
|
}
|
|
|
|
uint64_t Backend::complete_session_command()
|
|
{
|
|
uint64_t rval = m_session_commands.front()->get_position();
|
|
m_session_commands.pop_front();
|
|
return rval;
|
|
}
|
|
|
|
size_t Backend::session_command_count() const
|
|
{
|
|
return m_session_commands.size();
|
|
}
|
|
|
|
const SSessionCommand& Backend::next_session_command() const
|
|
{
|
|
ss_dassert(session_command_count() > 0);
|
|
return m_session_commands.front();
|
|
}
|
|
|
|
void Backend::clear_state(backend_state state)
|
|
{
|
|
if ((state & WAITING_RESULT) && (m_state & WAITING_RESULT))
|
|
{
|
|
ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, -1);
|
|
ss_dassert(prev2 > 0);
|
|
}
|
|
|
|
m_state &= ~state;
|
|
}
|
|
|
|
void Backend::set_state(backend_state state)
|
|
{
|
|
if ((state & WAITING_RESULT) && (m_state & WAITING_RESULT) == 0)
|
|
{
|
|
ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, 1);
|
|
ss_dassert(prev2 >= 0);
|
|
}
|
|
|
|
m_state |= state;
|
|
}
|
|
|
|
SERVER_REF* Backend::backend() const
|
|
{
|
|
ss_dassert(m_backend);
|
|
return m_backend;
|
|
}
|
|
|
|
SERVER* Backend::server() const
|
|
{
|
|
ss_dassert(m_backend);
|
|
return m_backend->server;
|
|
}
|
|
|
|
bool Backend::can_connect() const
|
|
{
|
|
return !has_failed() && SERVER_IS_RUNNING(m_backend->server);
|
|
}
|
|
|
|
bool Backend::connect(MXS_SESSION* session)
|
|
{
|
|
bool rval = false;
|
|
|
|
if ((m_dcb = dcb_connect(m_backend->server, session, m_backend->server->protocol)))
|
|
{
|
|
m_state = IN_USE;
|
|
atomic_add(&m_backend->connections, 1);
|
|
rval = true;
|
|
}
|
|
else
|
|
{
|
|
m_state = FATAL_FAILURE;
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
DCB* Backend::dcb() const
|
|
{
|
|
return m_dcb;
|
|
}
|
|
|
|
bool Backend::write(GWBUF* buffer, response_type type)
|
|
{
|
|
bool rval = m_dcb->func.write(m_dcb, buffer) != 0;
|
|
|
|
if (rval && type == EXPECT_RESPONSE)
|
|
{
|
|
set_state(WAITING_RESULT);
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
bool Backend::auth(GWBUF* buffer)
|
|
{
|
|
bool rval = false;
|
|
|
|
if (m_dcb->func.auth(m_dcb, NULL, m_dcb->session, buffer) == 1)
|
|
{
|
|
set_state(WAITING_RESULT);
|
|
rval = true;
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
void Backend::ack_write()
|
|
{
|
|
ss_dassert(is_waiting_result());
|
|
clear_state(WAITING_RESULT);
|
|
}
|
|
|
|
void Backend::store_command(GWBUF* buffer)
|
|
{
|
|
m_pending_cmd.reset(buffer);
|
|
}
|
|
|
|
bool Backend::write_stored_command()
|
|
{
|
|
bool rval = false;
|
|
|
|
if (m_pending_cmd.length())
|
|
{
|
|
rval = write(m_pending_cmd.release());
|
|
|
|
if (!rval)
|
|
{
|
|
MXS_ERROR("Routing of pending query failed.");
|
|
}
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
bool Backend::in_use() const
|
|
{
|
|
return m_state & IN_USE;
|
|
}
|
|
|
|
bool Backend::is_active() const
|
|
{
|
|
return SERVER_REF_IS_ACTIVE(m_backend);
|
|
}
|
|
|
|
bool Backend::is_waiting_result() const
|
|
{
|
|
return m_state & WAITING_RESULT;
|
|
}
|
|
|
|
bool Backend::is_closed() const
|
|
{
|
|
return m_state & CLOSED;
|
|
}
|
|
|
|
bool Backend::has_failed() const
|
|
{
|
|
return m_state & FATAL_FAILURE;
|
|
}
|