307 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			307 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|  * Copyright (c) 2016 MariaDB Corporation Ab
 | |
|  *
 | |
|  * Use of this software is governed by the Business Source License included
 | |
|  * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | |
|  *
 | |
|  * Change Date: 2022-01-01
 | |
|  *
 | |
|  * On the date above, in accordance with the Business Source License, use
 | |
|  * of this software will be governed by version 2 or later of the General
 | |
|  * Public License.
 | |
|  */
 | |
| 
 | |
| #include <maxscale/protocol/rwbackend.hh>
 | |
| 
 | |
| #include <maxscale/modutil.hh>
 | |
| #include <maxscale/protocol/mysql.hh>
 | |
| 
 | |
| namespace maxscale
 | |
| {
 | |
| 
 | |
| RWBackend::RWBackend(SERVER_REF* ref)
 | |
|     : mxs::Backend(ref)
 | |
|     , m_reply_state(REPLY_STATE_DONE)
 | |
|     , m_modutil_state{0}
 | |
|     , m_command(0)
 | |
|     , m_opening_cursor(false)
 | |
|     , m_expected_rows(0)
 | |
|     , m_local_infile_requested(false)
 | |
| {
 | |
| }
 | |
| 
 | |
| RWBackend::~RWBackend()
 | |
| {
 | |
| }
 | |
| 
 | |
| bool RWBackend::execute_session_command()
 | |
| {
 | |
|     m_command = next_session_command()->get_command();
 | |
|     bool expect_response = mxs_mysql_command_will_respond(m_command);
 | |
|     bool rval = mxs::Backend::execute_session_command();
 | |
| 
 | |
|     if (rval && expect_response)
 | |
|     {
 | |
|         set_reply_state(REPLY_STATE_START);
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| bool RWBackend::continue_session_command(GWBUF* buffer)
 | |
| {
 | |
|     return Backend::write(buffer, NO_RESPONSE);
 | |
| }
 | |
| 
 | |
| void RWBackend::add_ps_handle(uint32_t id, uint32_t handle)
 | |
| {
 | |
|     m_ps_handles[id] = handle;
 | |
|     MXS_INFO("PS response for %s: %u -> %u", name(), id, handle);
 | |
| }
 | |
| 
 | |
| uint32_t RWBackend::get_ps_handle(uint32_t id) const
 | |
| {
 | |
|     BackendHandleMap::const_iterator it = m_ps_handles.find(id);
 | |
| 
 | |
|     if (it != m_ps_handles.end())
 | |
|     {
 | |
|         return it->second;
 | |
|     }
 | |
| 
 | |
|     return 0;
 | |
| }
 | |
| 
 | |
| bool RWBackend::write(GWBUF* buffer, response_type type)
 | |
| {
 | |
|     if (type == mxs::Backend::EXPECT_RESPONSE)
 | |
|     {
 | |
|         /** The server will reply to this command */
 | |
|         set_reply_state(REPLY_STATE_START);
 | |
|     }
 | |
| 
 | |
|     uint8_t cmd = mxs_mysql_get_command(buffer);
 | |
| 
 | |
|     m_command = cmd;
 | |
| 
 | |
|     if (mxs_mysql_is_ps_command(cmd))
 | |
|     {
 | |
|         uint32_t id = mxs_mysql_extract_ps_id(buffer);
 | |
|         BackendHandleMap::iterator it = m_ps_handles.find(id);
 | |
| 
 | |
|         if (it != m_ps_handles.end())
 | |
|         {
 | |
|             /** Replace the client handle with the real PS handle */
 | |
|             uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
 | |
|             gw_mysql_set_byte4(ptr, it->second);
 | |
| 
 | |
|             if (cmd == MXS_COM_STMT_EXECUTE)
 | |
|             {
 | |
|                 // Extract the flag byte after the statement ID
 | |
|                 uint8_t flags = 0;
 | |
|                 gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET + MYSQL_PS_ID_SIZE, 1, &flags);
 | |
| 
 | |
|                 // Any non-zero flag value means that we have an open cursor
 | |
|                 m_opening_cursor = flags != 0;
 | |
|             }
 | |
|             else if (cmd == MXS_COM_STMT_CLOSE)
 | |
|             {
 | |
|                 m_ps_handles.erase(it);
 | |
|             }
 | |
|             else if (cmd == MXS_COM_STMT_FETCH)
 | |
|             {
 | |
|                 // Number of rows to fetch is a 4 byte integer after the ID
 | |
|                 uint8_t buf[4];
 | |
|                 gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET + MYSQL_PS_ID_SIZE, 4, buf);
 | |
|                 m_expected_rows = gw_mysql_get_byte4(buf);
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     return mxs::Backend::write(buffer, type);
 | |
| }
 | |
| 
 | |
| void RWBackend::close(close_type type)
 | |
| {
 | |
|     m_reply_state = REPLY_STATE_DONE;
 | |
|     mxs::Backend::close(type);
 | |
| }
 | |
| 
 | |
| bool RWBackend::consume_fetched_rows(GWBUF* buffer)
 | |
| {
 | |
|     m_expected_rows -= modutil_count_packets(buffer);
 | |
|     mxb_assert(m_expected_rows >= 0);
 | |
|     return m_expected_rows == 0;
 | |
| }
 | |
| 
 | |
| static inline bool have_next_packet(GWBUF* buffer)
 | |
| {
 | |
|     uint32_t len = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer)) + MYSQL_HEADER_LEN;
 | |
|     return gwbuf_length(buffer) > len;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Process a possibly partial response from the backend
 | |
|  *
 | |
|  * @param buffer  Buffer containing the response
 | |
|  */
 | |
| void RWBackend::process_reply(GWBUF* buffer)
 | |
| {
 | |
|     if (current_command() == MXS_COM_STMT_FETCH)
 | |
|     {
 | |
|         bool more = false;
 | |
|         int n_eof = modutil_count_signal_packets(buffer, 0, &more, &m_modutil_state);
 | |
| 
 | |
|         // If the server responded with an error, n_eof > 0
 | |
|         if (n_eof > 0 || consume_fetched_rows(buffer))
 | |
|         {
 | |
|             set_reply_state(REPLY_STATE_DONE);
 | |
|         }
 | |
|     }
 | |
|     else if (current_command() == MXS_COM_STATISTICS)
 | |
|     {
 | |
|         // COM_STATISTICS returns a single string and thus requires special handling
 | |
|         set_reply_state(REPLY_STATE_DONE);
 | |
|     }
 | |
|     else if (get_reply_state() == REPLY_STATE_START
 | |
|              && (!mxs_mysql_is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer)))
 | |
|     {
 | |
|         m_local_infile_requested = false;
 | |
| 
 | |
|         if (GWBUF_IS_COLLECTED_RESULT(buffer)
 | |
|             || current_command() == MXS_COM_STMT_PREPARE
 | |
|             || !mxs_mysql_is_ok_packet(buffer)
 | |
|             || !mxs_mysql_more_results_after_ok(buffer))
 | |
|         {
 | |
|             /** Not a result set, we have the complete response */
 | |
|             set_reply_state(REPLY_STATE_DONE);
 | |
| 
 | |
|             if (mxs_mysql_is_local_infile(buffer))
 | |
|             {
 | |
|                 m_local_infile_requested = true;
 | |
|             }
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             // This is an OK packet and more results will follow
 | |
|             mxb_assert(mxs_mysql_is_ok_packet(buffer)
 | |
|                        && mxs_mysql_more_results_after_ok(buffer));
 | |
| 
 | |
|             if (have_next_packet(buffer))
 | |
|             {
 | |
|                 // TODO: Don't clone the buffer
 | |
|                 GWBUF* tmp = gwbuf_clone(buffer);
 | |
|                 tmp = gwbuf_consume(tmp, mxs_mysql_get_packet_len(tmp));
 | |
| 
 | |
|                 // Consume repeating OK packets
 | |
|                 while (mxs_mysql_more_results_after_ok(buffer) && have_next_packet(tmp))
 | |
|                 {
 | |
|                     tmp = gwbuf_consume(tmp, mxs_mysql_get_packet_len(tmp));
 | |
|                     mxb_assert(tmp);
 | |
|                 }
 | |
| 
 | |
|                 process_reply(tmp);
 | |
|                 gwbuf_free(tmp);
 | |
|                 return;
 | |
|             }
 | |
|         }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         bool more = false;
 | |
|         int n_old_eof = get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
 | |
|         int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &m_modutil_state);
 | |
| 
 | |
|         if (n_eof > 2)
 | |
|         {
 | |
|             /**
 | |
|              * We have multiple results in the buffer, we only care about
 | |
|              * the state of the last one. Skip the complete result sets and act
 | |
|              * like we're processing a single result set.
 | |
|              */
 | |
|             n_eof = n_eof % 2 ? 1 : 2;
 | |
|         }
 | |
| 
 | |
|         if (n_eof == 0)
 | |
|         {
 | |
|             /** Waiting for the EOF packet after the column definitions */
 | |
|             set_reply_state(REPLY_STATE_RSET_COLDEF);
 | |
|         }
 | |
|         else if (n_eof == 1 && current_command() != MXS_COM_FIELD_LIST)
 | |
|         {
 | |
|             /** Waiting for the EOF packet after the rows */
 | |
|             set_reply_state(REPLY_STATE_RSET_ROWS);
 | |
| 
 | |
|             if (is_opening_cursor())
 | |
|             {
 | |
|                 set_cursor_opened();
 | |
|                 MXS_INFO("Cursor successfully opened");
 | |
|                 set_reply_state(REPLY_STATE_DONE);
 | |
|             }
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             /** We either have a complete result set or a response to
 | |
|              * a COM_FIELD_LIST command */
 | |
|             mxb_assert(n_eof == 2 || (n_eof == 1 && current_command() == MXS_COM_FIELD_LIST));
 | |
|             set_reply_state(REPLY_STATE_DONE);
 | |
| 
 | |
|             if (more)
 | |
|             {
 | |
|                 /** The server will send more resultsets */
 | |
|                 set_reply_state(REPLY_STATE_START);
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     if (get_reply_state() == REPLY_STATE_DONE)
 | |
|     {
 | |
|         ack_write();
 | |
|     }
 | |
| }
 | |
| 
 | |
| ResponseStat& RWBackend::response_stat()
 | |
| {
 | |
|     return m_response_stat;
 | |
| }
 | |
| 
 | |
| void RWBackend::change_rlag_state(SERVER::RLagState new_state, int max_rlag)
 | |
| {
 | |
|     mxb_assert(new_state == SERVER::RLagState::BELOW_LIMIT || new_state == SERVER::RLagState::ABOVE_LIMIT);
 | |
|     namespace atom = maxbase::atomic;
 | |
|     auto srv = server();
 | |
|     auto old_state = atom::load(&srv->rlag_state, atom::RELAXED);
 | |
|     if (new_state != old_state)
 | |
|     {
 | |
|         atom::store(&srv->rlag_state, new_state, atom::RELAXED);
 | |
|         // State has just changed, log warning. Don't log catchup if old state was RLAG_NONE.
 | |
|         if (new_state == SERVER::RLagState::ABOVE_LIMIT)
 | |
|         {
 | |
|             MXS_WARNING("Replication lag of '%s' is %is, which is above the configured limit %is. "
 | |
|                         "'%s' is excluded from query routing.",
 | |
|                         srv->name(), srv->rlag, max_rlag, srv->name());
 | |
|         }
 | |
|         else if (old_state == SERVER::RLagState::ABOVE_LIMIT)
 | |
|         {
 | |
|             MXS_WARNING("Replication lag of '%s' is %is, which is below the configured limit %is. "
 | |
|                         "'%s' is returned to query routing.",
 | |
|                         srv->name(), srv->rlag, max_rlag, srv->name());
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| mxs::SRWBackends RWBackend::from_servers(SERVER_REF* servers)
 | |
| {
 | |
|     SRWBackends backends;
 | |
| 
 | |
|     for (SERVER_REF* ref = servers; ref; ref = ref->next)
 | |
|     {
 | |
|         if (ref->active)
 | |
|         {
 | |
|             backends.emplace_back(new mxs::RWBackend(ref));
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     return backends;
 | |
| }
 | |
| }
 | 
