 28609a2c77
			
		
	
	28609a2c77
	
	
	
		
			
			With the removal of the old session command implementation, the code that used it can be removed or replaced with newer constructs. As a result, the backend protocol no longer does any session command processing. The three buffer types, GWBUF_TYPE_SESCMD_RESPONSE, GWBUF_TYPE_RESPONSE_END and GWBUF_TYPE_SESCMD as well as their related macros are no longer used and can be removed.
		
			
				
	
	
		
			261 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			261 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|  * Copyright (c) 2016 MariaDB Corporation Ab
 | |
|  *
 | |
|  * Use of this software is governed by the Business Source License included
 | |
|  * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | |
|  *
 | |
|  * Change Date: 2022-01-01
 | |
|  *
 | |
|  * On the date above, in accordance with the Business Source License, use
 | |
|  * of this software will be governed by version 2 or later of the General
 | |
|  * Public License.
 | |
|  */
 | |
| 
 | |
| #include <maxscale/backend.hh>
 | |
| 
 | |
| #include <sstream>
 | |
| 
 | |
| #include <maxscale/protocol/mysql.h>
 | |
| #include <maxscale/debug.h>
 | |
| 
 | |
| using namespace maxscale;
 | |
| 
 | |
| Backend::Backend(SERVER_REF *ref):
 | |
|     m_closed(false),
 | |
|     m_backend(ref),
 | |
|     m_dcb(NULL),
 | |
|     m_state(0)
 | |
| {
 | |
|     std::stringstream ss;
 | |
|     ss << "[" << server()->address << "]:" << server()->port;
 | |
|     m_uri = ss.str();
 | |
| }
 | |
| 
 | |
| Backend::~Backend()
 | |
| {
 | |
|     ss_dassert(m_closed || !in_use());
 | |
| 
 | |
|     if (in_use())
 | |
|     {
 | |
|         close();
 | |
|     }
 | |
| }
 | |
| 
 | |
| void Backend::close(close_type type)
 | |
| {
 | |
|     ss_dassert(m_dcb->n_close == 0);
 | |
| 
 | |
|     if (!m_closed)
 | |
|     {
 | |
|         m_closed = true;
 | |
| 
 | |
|         if (in_use())
 | |
|         {
 | |
|             CHK_DCB(m_dcb);
 | |
| 
 | |
|             /** Clean operation counter in bref and in SERVER */
 | |
|             if (is_waiting_result())
 | |
|             {
 | |
|                 clear_state(WAITING_RESULT);
 | |
|             }
 | |
|             clear_state(IN_USE);
 | |
| 
 | |
|             if (type == CLOSE_FATAL)
 | |
|             {
 | |
|                 set_state(FATAL_FAILURE);
 | |
|             }
 | |
| 
 | |
|             dcb_close(m_dcb);
 | |
|             m_dcb = NULL;
 | |
| 
 | |
|             /** decrease server current connection counters */
 | |
|             atomic_add(&m_backend->connections, -1);
 | |
|         }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         ss_dassert(false);
 | |
|     }
 | |
| }
 | |
| 
 | |
| bool Backend::execute_session_command()
 | |
| {
 | |
|     if (is_closed() || !has_session_commands())
 | |
|     {
 | |
|         return false;
 | |
|     }
 | |
| 
 | |
|     CHK_DCB(m_dcb);
 | |
| 
 | |
|     SSessionCommand& sescmd = m_session_commands.front();
 | |
|     GWBUF *buffer = sescmd->deep_copy_buffer();
 | |
|     bool rval = false;
 | |
| 
 | |
|     switch (sescmd->get_command())
 | |
|     {
 | |
|     case MXS_COM_QUIT:
 | |
|     case MXS_COM_STMT_CLOSE:
 | |
|     case MXS_COM_STMT_SEND_LONG_DATA:
 | |
|         /** These commands do not generate responses */
 | |
|         rval = write(buffer, NO_RESPONSE);
 | |
|         complete_session_command();
 | |
|         ss_dassert(!is_waiting_result());
 | |
|         break;
 | |
| 
 | |
|     case MXS_COM_CHANGE_USER:
 | |
|         rval = auth(buffer);
 | |
|         break;
 | |
| 
 | |
|     case MXS_COM_QUERY:
 | |
|     default:
 | |
|         // We want the complete response in one packet
 | |
|         gwbuf_set_type(buffer, GWBUF_TYPE_COLLECT_RESULT);
 | |
|         rval = write(buffer, EXPECT_RESPONSE);
 | |
|         ss_dassert(is_waiting_result());
 | |
|         break;
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| void Backend::append_session_command(GWBUF* buffer, uint64_t sequence)
 | |
| {
 | |
|     append_session_command(SSessionCommand(new SessionCommand(buffer, sequence)));
 | |
| }
 | |
| 
 | |
| void Backend::append_session_command(const SSessionCommand& sescmd)
 | |
| {
 | |
|     m_session_commands.push_back(sescmd);
 | |
| }
 | |
| 
 | |
| void Backend::append_session_command(const SessionCommandList& sescmdlist)
 | |
| {
 | |
|     m_session_commands.insert(m_session_commands.end(), sescmdlist.begin(), sescmdlist.end());
 | |
| }
 | |
| 
 | |
| uint64_t Backend::complete_session_command()
 | |
| {
 | |
|     uint64_t rval = m_session_commands.front()->get_position();
 | |
|     m_session_commands.pop_front();
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| size_t Backend::session_command_count() const
 | |
| {
 | |
|     return m_session_commands.size();
 | |
| }
 | |
| 
 | |
| const SSessionCommand& Backend::next_session_command() const
 | |
| {
 | |
|     ss_dassert(has_session_commands());
 | |
|     return m_session_commands.front();
 | |
| }
 | |
| 
 | |
| void Backend::clear_state(backend_state state)
 | |
| {
 | |
|     if ((state & WAITING_RESULT) && (m_state & WAITING_RESULT))
 | |
|     {
 | |
|         ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, -1);
 | |
|         ss_dassert(prev2 > 0);
 | |
|     }
 | |
| 
 | |
|     m_state &= ~state;
 | |
| }
 | |
| 
 | |
| void Backend::set_state(backend_state state)
 | |
| {
 | |
|     if ((state & WAITING_RESULT) && (m_state & WAITING_RESULT) == 0)
 | |
|     {
 | |
|         ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, 1);
 | |
|         ss_dassert(prev2 >= 0);
 | |
|     }
 | |
| 
 | |
|     m_state |= state;
 | |
| }
 | |
| 
 | |
| bool Backend::connect(MXS_SESSION* session, SessionCommandList* sescmd)
 | |
| {
 | |
|     ss_dassert(!in_use());
 | |
|     bool rval = false;
 | |
| 
 | |
|     if ((m_dcb = dcb_connect(m_backend->server, session, m_backend->server->protocol)))
 | |
|     {
 | |
|         m_closed = false;
 | |
|         m_state = IN_USE;
 | |
|         atomic_add(&m_backend->connections, 1);
 | |
|         rval = true;
 | |
| 
 | |
|         if (sescmd && sescmd->size())
 | |
|         {
 | |
|             append_session_command(*sescmd);
 | |
| 
 | |
|             if (!execute_session_command())
 | |
|             {
 | |
|                 rval = false;
 | |
|             }
 | |
|         }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         m_state = FATAL_FAILURE;
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| bool Backend::write(GWBUF* buffer, response_type type)
 | |
| {
 | |
|     ss_dassert(in_use());
 | |
|     bool rval = m_dcb->func.write(m_dcb, buffer) != 0;
 | |
| 
 | |
|     if (rval && type == EXPECT_RESPONSE)
 | |
|     {
 | |
|         set_state(WAITING_RESULT);
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| bool Backend::auth(GWBUF* buffer)
 | |
| {
 | |
|     ss_dassert(in_use());
 | |
|     bool rval = false;
 | |
| 
 | |
|     if (m_dcb->func.auth(m_dcb, NULL, m_dcb->session, buffer) == 1)
 | |
|     {
 | |
|         set_state(WAITING_RESULT);
 | |
|         rval = true;
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| void Backend::ack_write()
 | |
| {
 | |
|     ss_dassert(is_waiting_result());
 | |
|     clear_state(WAITING_RESULT);
 | |
| }
 | |
| 
 | |
| void Backend::store_command(GWBUF* buffer)
 | |
| {
 | |
|     m_pending_cmd.reset(buffer);
 | |
| }
 | |
| 
 | |
| bool Backend::write_stored_command()
 | |
| {
 | |
|     ss_dassert(in_use());
 | |
|     bool rval = false;
 | |
| 
 | |
|     if (m_pending_cmd.length())
 | |
|     {
 | |
|         rval = write(m_pending_cmd.release());
 | |
| 
 | |
|         if (!rval)
 | |
|         {
 | |
|             MXS_ERROR("Routing of pending query failed.");
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 |