630 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			630 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
 | 
						|
reserved.
 | 
						|
Copyright (c) 2013, SkySQL Ab
 | 
						|
 | 
						|
Portions of this file contain modifications contributed and copyrighted by
 | 
						|
SkySQL, Ab. Those modifications are gratefully acknowledged and are described
 | 
						|
briefly in the source code.
 | 
						|
 | 
						|
This program is free software; you can redistribute it and/or
 | 
						|
modify it under the terms of the GNU General Public License
 | 
						|
as published by the Free Software Foundation; version 2 of
 | 
						|
the License.
 | 
						|
 | 
						|
This program is distributed in the hope that it will be useful,
 | 
						|
but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | 
						|
GNU General Public License for more details.
 | 
						|
 | 
						|
You should have received a copy of the GNU General Public License
 | 
						|
along with this program; if not, write to the Free Software
 | 
						|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 | 
						|
02110-1301  USA
 | 
						|
*/
 | 
						|
/*
 | 
						|
SkySQL change details:
 | 
						|
- Added support for GTID event handling for both MySQL and MariaDB
 | 
						|
 | 
						|
Author: Jan Lindström (jan.lindstrom@skysql.com
 | 
						|
 | 
						|
*/
 | 
						|
#include <stdint.h>
 | 
						|
#include <boost/array.hpp>
 | 
						|
#include <vector>
 | 
						|
 | 
						|
#include "protocol.h"
 | 
						|
#include "listener_exception.h"
 | 
						|
#include <iostream>
 | 
						|
#include <mysql.h>
 | 
						|
#include <my_global.h>
 | 
						|
#include <mysql_com.h>
 | 
						|
 | 
						|
using namespace mysql;
 | 
						|
using namespace mysql::system;
 | 
						|
 | 
						|
namespace mysql { namespace system {
 | 
						|
 | 
						|
int proto_read_package_header(tcp::socket *socket, unsigned long *packet_length, unsigned char *packet_no)
 | 
						|
{
 | 
						|
  unsigned char buf[4];
 | 
						|
 | 
						|
  try {
 | 
						|
    boost::asio::read(*socket, boost::asio::buffer(buf, 4),
 | 
						|
                      boost::asio::transfer_at_least(4));
 | 
						|
  } 
 | 
						|
  catch (boost::system::system_error const& e)
 | 
						|
  {
 | 
						|
    throw(ListenerException("Read package header error: " + std::string(e.what()), __FILE__, __LINE__));
 | 
						|
  }
 | 
						|
  catch (boost::system::error_code const& e)
 | 
						|
  {
 | 
						|
    throw(ListenerException("Read package header error: " + e.message(), __FILE__, __LINE__));
 | 
						|
  }
 | 
						|
  catch (std::exception& e)
 | 
						|
  {
 | 
						|
    throw(ListenerException("Read package header error: " + std::string(e.what()), __FILE__, __LINE__));
 | 
						|
  }
 | 
						|
 | 
						|
  *packet_length=  (unsigned long)(buf[0] &0xFF);
 | 
						|
  *packet_length+= (unsigned long)((buf[1] &0xFF)<<8);
 | 
						|
  *packet_length+= (unsigned long)((buf[2] &0xFF)<<16);
 | 
						|
  *packet_no= (unsigned char)buf[3];
 | 
						|
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
int proto_read_package_header(tcp::socket *socket, boost::asio::streambuf &buff, unsigned long *packet_length, unsigned char *packet_no)
 | 
						|
{
 | 
						|
  std::streamsize inbuff= buff.in_avail();
 | 
						|
  if( inbuff < 0)
 | 
						|
    inbuff= 0;
 | 
						|
 | 
						|
  if (4 > inbuff)
 | 
						|
  {
 | 
						|
    try {
 | 
						|
      boost::asio::read(*socket, buff,
 | 
						|
                        boost::asio::transfer_at_least(4-inbuff));
 | 
						|
    } 
 | 
						|
    catch (boost::system::system_error const& e)
 | 
						|
    {
 | 
						|
      throw(ListenerException("Read package header error: " + std::string(e.what()), __FILE__, __LINE__));
 | 
						|
    }
 | 
						|
    catch (boost::system::error_code const& e)
 | 
						|
    {
 | 
						|
      throw(ListenerException("Read package header error: " + e.message(), __FILE__, __LINE__));
 | 
						|
    }
 | 
						|
    catch (std::exception& e)
 | 
						|
    {
 | 
						|
      throw(ListenerException("Read package header error: " + std::string(e.what()), __FILE__, __LINE__));
 | 
						|
    }
 | 
						|
  }
 | 
						|
  char ch;
 | 
						|
  std::istream is(&buff);
 | 
						|
  is.get(ch);
 | 
						|
  *packet_length= (unsigned long)ch;
 | 
						|
  is.get(ch);
 | 
						|
  *packet_length+= (unsigned long)(ch<<8);
 | 
						|
  is.get(ch);
 | 
						|
  *packet_length+= (unsigned long)(ch<<16);
 | 
						|
  is.get(ch);
 | 
						|
  *packet_no= (unsigned char)ch;
 | 
						|
 | 
						|
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
int proto_get_one_package(tcp::socket *socket, boost::asio::streambuf &buff,
 | 
						|
                          boost::uint8_t *packet_no)
 | 
						|
{
 | 
						|
  unsigned long packet_length;
 | 
						|
  if (proto_read_package_header(socket, buff, &packet_length, packet_no))
 | 
						|
    return 0;
 | 
						|
  std::streamsize inbuffer= buff.in_avail();
 | 
						|
  if (inbuffer < 0)
 | 
						|
    inbuffer= 0;
 | 
						|
  if (packet_length > inbuffer)
 | 
						|
    boost::asio::read(*socket, buff,
 | 
						|
                      boost::asio::transfer_at_least(packet_length-inbuffer));
 | 
						|
 | 
						|
  return packet_length;
 | 
						|
}
 | 
						|
 | 
						|
void prot_parse_error_message(std::istream &is, struct st_error_package &err,
 | 
						|
                              int packet_length)
 | 
						|
{
 | 
						|
  boost::uint8_t marker;
 | 
						|
 | 
						|
  Protocol_chunk<boost::uint16_t> prot_errno(err.error_code);
 | 
						|
  Protocol_chunk<boost::uint8_t>  prot_marker(marker);
 | 
						|
  Protocol_chunk<boost::uint8_t>  prot_sql_state(err.sql_state,5);
 | 
						|
 | 
						|
  is >> prot_errno
 | 
						|
     >> prot_marker
 | 
						|
     >> prot_sql_state;
 | 
						|
 | 
						|
    // TODO is the number of bytes read = is.tellg() ?
 | 
						|
 | 
						|
  int message_size= packet_length -2 -1 -5; // the remaining part of the package
 | 
						|
  Protocol_chunk_string prot_message(err.message, message_size);
 | 
						|
  is >> prot_message;
 | 
						|
  err.message[message_size]= '\0';
 | 
						|
}
 | 
						|
 | 
						|
void prot_parse_ok_message(std::istream &is, struct st_ok_package &ok, int packet_length)
 | 
						|
{
 | 
						|
 // TODO: Assure that zero length messages can be but on the input stream.
 | 
						|
 | 
						|
  //Protocol_chunk<boost::uint8_t>  prot_result_type(result_type);
 | 
						|
  Protocol_chunk<boost::uint64_t> prot_affected_rows(ok.affected_rows);
 | 
						|
  Protocol_chunk<boost::uint64_t> prot_insert_id(ok.insert_id);
 | 
						|
  Protocol_chunk<boost::uint16_t> prot_server_status(ok.server_status);
 | 
						|
  Protocol_chunk<boost::uint16_t> prot_warning_count(ok.warning_count);
 | 
						|
 | 
						|
  int message_size= packet_length -2  -prot_affected_rows.size()
 | 
						|
                    -prot_insert_id.size() -prot_server_status.size()
 | 
						|
                    -prot_warning_count.size();
 | 
						|
 | 
						|
  prot_affected_rows.set_length_encoded_binary(true);
 | 
						|
  prot_insert_id.set_length_encoded_binary(true);
 | 
						|
 | 
						|
  is  >> prot_affected_rows
 | 
						|
      >> prot_insert_id
 | 
						|
      >> prot_server_status
 | 
						|
      >> prot_warning_count;
 | 
						|
 | 
						|
  if (message_size > 0)
 | 
						|
  {
 | 
						|
    Protocol_chunk_string prot_message(ok.message, message_size);
 | 
						|
    is >> prot_message;
 | 
						|
    ok.message[message_size]= '\0';
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void prot_parse_eof_message(std::istream &is, struct st_eof_package &eof)
 | 
						|
{
 | 
						|
  Protocol_chunk<boost::uint16_t> proto_warning_count(eof.warning_count);
 | 
						|
  Protocol_chunk<boost::uint16_t> proto_status_flags(eof.status_flags);
 | 
						|
 | 
						|
  is >> proto_warning_count
 | 
						|
     >> proto_status_flags;
 | 
						|
}
 | 
						|
 | 
						|
void proto_get_handshake_package(std::istream &is,
 | 
						|
                                 struct st_handshake_package &p,
 | 
						|
                                 int packet_length)
 | 
						|
{
 | 
						|
  boost::uint8_t filler;
 | 
						|
  boost::uint8_t filler2[13];
 | 
						|
 | 
						|
  Protocol_chunk<boost::uint8_t>   proto_protocol_version(p.protocol_version);
 | 
						|
  Protocol_chunk<boost::uint32_t>  proto_thread_id(p.thread_id);
 | 
						|
  Protocol_chunk<boost::uint8_t>   proto_scramble_buffer(p.scramble_buff, 8);
 | 
						|
  Protocol_chunk<boost::uint8_t>   proto_filler(filler);
 | 
						|
  Protocol_chunk<boost::uint16_t>  proto_server_capabilities(p.server_capabilities);
 | 
						|
  Protocol_chunk<boost::uint8_t>   proto_server_language(p.server_language);
 | 
						|
  Protocol_chunk<boost::uint16_t>  proto_server_status(p.server_status);
 | 
						|
  Protocol_chunk<boost::uint8_t>   proto_filler2(filler2,13);
 | 
						|
  Protocol_chunk<boost::uint8_t>   proto_scramble_buffer2(p.scramble_buff2, 13);
 | 
						|
 | 
						|
  is  >> proto_protocol_version
 | 
						|
      >> p.server_version_str
 | 
						|
      >> proto_thread_id
 | 
						|
      >> proto_scramble_buffer
 | 
						|
      >> proto_filler
 | 
						|
      >> proto_server_capabilities
 | 
						|
      >> proto_server_language
 | 
						|
      >> proto_server_status
 | 
						|
      >> proto_filler2
 | 
						|
      >> proto_scramble_buffer2;
 | 
						|
 | 
						|
  //assert(filler == 0);
 | 
						|
 | 
						|
  int remaining_bytes= packet_length - 9+13+13+8;
 | 
						|
  boost::uint8_t extention_buffer[remaining_bytes];
 | 
						|
  if (remaining_bytes > 0)
 | 
						|
  {
 | 
						|
    Protocol_chunk<boost::uint8_t> proto_extension(extention_buffer, remaining_bytes);
 | 
						|
    is >> proto_extension;
 | 
						|
  }
 | 
						|
 | 
						|
  //std::copy(&extention_buffer[0],&extention_buffer[remaining_bytes],std::ostream_iterator<char>(std::cout,","));
 | 
						|
}
 | 
						|
 | 
						|
void write_packet_header(char *buff, boost::uint16_t size, boost::uint8_t packet_no)
 | 
						|
{
 | 
						|
  int3store(buff, size);
 | 
						|
  buff[3]= (char)packet_no;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
buffer_source &operator>>(buffer_source &src, Protocol &chunk)
 | 
						|
{
 | 
						|
  char ch;
 | 
						|
  int ct= 0;
 | 
						|
  char *ptr= (char*)chunk.data();
 | 
						|
 | 
						|
  while(ct < chunk.size() && src.m_ptr < src.m_size)
 | 
						|
  {
 | 
						|
    ptr[ct]= src.m_src[src.m_ptr];
 | 
						|
    ++ct;
 | 
						|
    ++src.m_ptr;
 | 
						|
  }
 | 
						|
  return src;
 | 
						|
}
 | 
						|
 | 
						|
std::istream &operator>>(std::istream &is, Protocol &chunk)
 | 
						|
{
 | 
						|
 if (chunk.is_length_encoded_binary())
 | 
						|
  {
 | 
						|
    int ct= 0;
 | 
						|
    is.read((char *)chunk.data(),1);
 | 
						|
    unsigned char byte= *(unsigned char *)chunk.data();
 | 
						|
    if (byte < 250)
 | 
						|
    {
 | 
						|
      chunk.collapse_size(1);
 | 
						|
      return is;
 | 
						|
    }
 | 
						|
    else if (byte == 251)
 | 
						|
    {
 | 
						|
      // is this a row data packet? if so, then this column value is NULL
 | 
						|
      chunk.collapse_size(1);
 | 
						|
      ct= 1;
 | 
						|
    }
 | 
						|
    else if (byte == 252)
 | 
						|
    {
 | 
						|
      chunk.collapse_size(2);
 | 
						|
      ct= 1;
 | 
						|
    }
 | 
						|
    else if(byte == 253)
 | 
						|
    {
 | 
						|
      chunk.collapse_size(3);
 | 
						|
      ct= 1;
 | 
						|
    }
 | 
						|
 | 
						|
    /* Read remaining bytes */
 | 
						|
    //is.read((char *)chunk.data(), chunk.size()-1);
 | 
						|
    char ch;
 | 
						|
    char *ptr= (char*)chunk.data();
 | 
						|
    while(ct < chunk.size())
 | 
						|
    {
 | 
						|
      is.get(ch);
 | 
						|
      ptr[ct]= ch;
 | 
						|
      ++ct;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    char ch;
 | 
						|
    int ct= 0;
 | 
						|
    char *ptr= (char*)chunk.data();
 | 
						|
    int sz= chunk.size();
 | 
						|
    while(ct < sz)
 | 
						|
    {
 | 
						|
      is.get(ch);
 | 
						|
      ptr[ct]= ch;
 | 
						|
      ++ct;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return is;
 | 
						|
}
 | 
						|
 | 
						|
std::istream &operator>>(std::istream &is, std::string &str)
 | 
						|
{
 | 
						|
  std::ostringstream out;
 | 
						|
  char ch;
 | 
						|
  int ct= 0;
 | 
						|
  do
 | 
						|
  {
 | 
						|
    is.get(ch);
 | 
						|
    out.put(ch);
 | 
						|
    ++ct;
 | 
						|
  } while (is.good() && ch != '\0');
 | 
						|
  str.append(out.str());
 | 
						|
  return is;
 | 
						|
}
 | 
						|
 | 
						|
std::istream &operator>>(std::istream &is, Protocol_chunk_string &str)
 | 
						|
{
 | 
						|
  char ch;
 | 
						|
  int ct= 0;
 | 
						|
  int sz= str.m_str->size();
 | 
						|
  for (ct=0; ct< sz && is.good(); ct++)
 | 
						|
  {
 | 
						|
    is.get(ch);
 | 
						|
    str.m_str->at(ct)= ch;
 | 
						|
  }
 | 
						|
  
 | 
						|
  return is;
 | 
						|
}
 | 
						|
 | 
						|
std::istream &operator>>(std::istream &is, Protocol_chunk_string_len &lenstr)
 | 
						|
{
 | 
						|
  boost::uint8_t len;
 | 
						|
  std::string *str= lenstr.m_storage;
 | 
						|
  Protocol_chunk<boost::uint8_t> proto_str_len(len);
 | 
						|
  is >> proto_str_len;
 | 
						|
  Protocol_chunk_string   proto_str(*str, len);
 | 
						|
  is >> proto_str;
 | 
						|
  return is;
 | 
						|
}
 | 
						|
 | 
						|
std::ostream &operator<<(std::ostream &os, Protocol &chunk)
 | 
						|
{
 | 
						|
  if (!os.bad())
 | 
						|
    os.write((const char *) chunk.data(),chunk.size());
 | 
						|
  return os;
 | 
						|
}
 | 
						|
 | 
						|
Query_event *proto_query_event(std::istream &is, Log_event_header *header)
 | 
						|
{
 | 
						|
  boost::uint8_t db_name_len;
 | 
						|
  boost::uint16_t var_size;
 | 
						|
  // Length of query stored in the payload.
 | 
						|
  boost::uint32_t query_len;
 | 
						|
  Query_event *qev=new Query_event(header);
 | 
						|
 | 
						|
  Protocol_chunk<boost::uint32_t> proto_query_event_thread_id(qev->thread_id);
 | 
						|
  Protocol_chunk<boost::uint32_t> proto_query_event_exec_time(qev->exec_time);
 | 
						|
  Protocol_chunk<boost::uint8_t> proto_query_event_db_name_len(db_name_len);
 | 
						|
  Protocol_chunk<boost::uint16_t> proto_query_event_error_code(qev->error_code);
 | 
						|
  Protocol_chunk<boost::uint16_t> proto_query_event_var_size(var_size);
 | 
						|
 | 
						|
  is >> proto_query_event_thread_id
 | 
						|
     >> proto_query_event_exec_time
 | 
						|
     >> proto_query_event_db_name_len
 | 
						|
     >> proto_query_event_error_code
 | 
						|
     >> proto_query_event_var_size;
 | 
						|
 | 
						|
  //TODO : Implement it in a better way.
 | 
						|
 | 
						|
  /*
 | 
						|
    Query length =
 | 
						|
    Total event length (header->event_length) -
 | 
						|
      (
 | 
						|
        (LOG_EVENT_HEADER_SIZE - 1) +  //Shouldn't LOG_EVENT_HEADER_SIZE=19?
 | 
						|
        Thread-id (pre-defined, 4) +
 | 
						|
        Execution time (pre-defined, 4) +
 | 
						|
        Placeholder to store database length (pre-defined, 1) +
 | 
						|
        Error code (pre-defined, 2) +
 | 
						|
        Placeholder to store length taken by status variable blk (pre-defined, 2) +
 | 
						|
        Status variable block length (calculated, var_size) +
 | 
						|
        Database name length (calculated, db_name_len) +
 | 
						|
        Null terninator (pre-defined, 1) +
 | 
						|
    )
 | 
						|
 | 
						|
    which gives :
 | 
						|
  */
 | 
						|
 | 
						|
  query_len= header->event_length - (LOG_EVENT_HEADER_SIZE + 13 + var_size +
 | 
						|
                                     db_name_len);
 | 
						|
 | 
						|
  qev->variables.reserve(var_size);
 | 
						|
  Protocol_chunk_vector proto_payload(qev->variables, var_size);
 | 
						|
  is >> proto_payload;
 | 
						|
 | 
						|
  Protocol_chunk_string proto_query_event_db_name(qev->db_name,
 | 
						|
                                                  (unsigned long)db_name_len);
 | 
						|
 | 
						|
  Protocol_chunk_string proto_query_event_query_str
 | 
						|
    (qev->query, (unsigned long)query_len);
 | 
						|
 | 
						|
  char zero_marker; // should always be 0;
 | 
						|
  is >> proto_query_event_db_name
 | 
						|
     >> zero_marker
 | 
						|
     >> proto_query_event_query_str;
 | 
						|
  // Following is not really required now,
 | 
						|
  //qev->query.resize(qev->query.size() - 1); // Last character is a '\0' character.
 | 
						|
 | 
						|
  return qev;
 | 
						|
}
 | 
						|
 | 
						|
Gtid_event *proto_gtid_event(std::istream &is, Log_event_header *header)
 | 
						|
{
 | 
						|
  Gtid_event *gev=new Gtid_event(header);
 | 
						|
  boost::uint32_t gtid_length=0;
 | 
						|
 | 
						|
  if (header->type_code == GTID_EVENT_MARIADB) {
 | 
						|
	  Protocol_chunk<boost::uint32_t> proto_gtid_event_domain_id(gev->domain_id);
 | 
						|
	  gev->server_id = header->server_id;
 | 
						|
	  Protocol_chunk<boost::uint64_t> proto_gtid_event_sequence_number(gev->sequence_number);
 | 
						|
 | 
						|
	  // In MariaDB GTIDs are just sequence number followed by domain id
 | 
						|
	  is >> proto_gtid_event_sequence_number
 | 
						|
	     >> proto_gtid_event_domain_id;
 | 
						|
	  gev->m_gtid= Gtid(gev->domain_id, gev->server_id, gev->sequence_number);
 | 
						|
  } else {
 | 
						|
	  // In MySQL GTIDs consists two parts SID and global sequence
 | 
						|
	  // number. SID is stored in encoded format, we will not try to
 | 
						|
	  // understand that. Global sequence number is more meaningfull.
 | 
						|
          unsigned char gtid_data[MYSQL_GTID_ENCODED_SIZE+1];
 | 
						|
	  memset(gtid_data, 0, MYSQL_GTID_ENCODED_SIZE+1);
 | 
						|
	  is.read((char *)gtid_data, MYSQL_GTID_ENCODED_SIZE);
 | 
						|
	  unsigned char *buf = gtid_data;
 | 
						|
	  buf++; // commit flag, ignore
 | 
						|
	  memcpy(gev->m_mysql_gtid, (char *)buf, MYSQL_GTID_ENCODED_SIZE);
 | 
						|
	  gev->sequence_number = uint8korr(buf+16);
 | 
						|
 | 
						|
	  gev->m_gtid= Gtid(gev->m_mysql_gtid, gev->sequence_number);
 | 
						|
  }
 | 
						|
 | 
						|
  return gev;
 | 
						|
}
 | 
						|
 | 
						|
Rotate_event *proto_rotate_event(std::istream &is, Log_event_header *header)
 | 
						|
{
 | 
						|
  Rotate_event *rev= new Rotate_event(header);
 | 
						|
 | 
						|
  boost::uint32_t file_name_length= header->event_length - 7 - LOG_EVENT_HEADER_SIZE;
 | 
						|
 | 
						|
  Protocol_chunk<boost::uint64_t > prot_position(rev->binlog_pos);
 | 
						|
  Protocol_chunk_string prot_file_name(rev->binlog_file, file_name_length);
 | 
						|
  is >> prot_position
 | 
						|
     >> prot_file_name;
 | 
						|
 | 
						|
  return rev;
 | 
						|
}
 | 
						|
 | 
						|
Incident_event *proto_incident_event(std::istream &is, Log_event_header *header)
 | 
						|
{
 | 
						|
  Incident_event *incident= new Incident_event(header);
 | 
						|
  Protocol_chunk<boost::uint8_t> proto_incident_code(incident->type);
 | 
						|
  Protocol_chunk_string_len      proto_incident_message(incident->message);
 | 
						|
 | 
						|
  is >> proto_incident_code
 | 
						|
     >> proto_incident_message;
 | 
						|
 | 
						|
  return incident;
 | 
						|
}
 | 
						|
 | 
						|
Row_event *proto_rows_event(std::istream &is, Log_event_header *header)
 | 
						|
{
 | 
						|
  Row_event *rev=new Row_event(header);
 | 
						|
 | 
						|
  union
 | 
						|
  {
 | 
						|
    boost::uint64_t integer;
 | 
						|
    boost::uint8_t bytes[6];
 | 
						|
  } table_id;
 | 
						|
 | 
						|
  table_id.integer=0L;
 | 
						|
  Protocol_chunk<boost::uint8_t>  proto_table_id(&table_id.bytes[0], 6);
 | 
						|
  Protocol_chunk<boost::uint16_t> proto_flags(rev->flags);
 | 
						|
  Protocol_chunk<boost::uint64_t> proto_column_len(rev->columns_len);
 | 
						|
  proto_column_len.set_length_encoded_binary(true);
 | 
						|
 | 
						|
  is >> proto_table_id
 | 
						|
     >> proto_flags
 | 
						|
     >> proto_column_len;
 | 
						|
 | 
						|
  rev->table_id=table_id.integer;
 | 
						|
  int used_column_len=(int) ((rev->columns_len + 7) / 8);
 | 
						|
  Protocol_chunk_vector proto_used_columns(rev->used_columns, used_column_len);
 | 
						|
  rev->null_bits_len= used_column_len;
 | 
						|
 | 
						|
  is >> proto_used_columns;
 | 
						|
 | 
						|
  if (header->type_code == UPDATE_ROWS_EVENT)
 | 
						|
  {
 | 
						|
    Protocol_chunk_vector proto_columns_before_image(rev->columns_before_image, used_column_len);
 | 
						|
    is >> proto_columns_before_image;
 | 
						|
  }
 | 
						|
 | 
						|
  int bytes_read=proto_table_id.size() + proto_flags.size() + proto_column_len.size() + used_column_len;
 | 
						|
  if (header->type_code == UPDATE_ROWS_EVENT)
 | 
						|
    bytes_read+=used_column_len;
 | 
						|
 | 
						|
  unsigned long row_len= header->event_length - bytes_read - LOG_EVENT_HEADER_SIZE + 1;
 | 
						|
  //std::cout << "Bytes read: " << bytes_read << " Bytes expected: " << rev->row_len << std::endl;
 | 
						|
  Protocol_chunk_vector proto_row(rev->row, row_len);
 | 
						|
  is >> proto_row;
 | 
						|
 | 
						|
  return rev;
 | 
						|
}
 | 
						|
 | 
						|
Int_var_event *proto_intvar_event(std::istream &is, Log_event_header *header)
 | 
						|
{
 | 
						|
  Int_var_event *event= new Int_var_event(header);
 | 
						|
 | 
						|
  Protocol_chunk<boost::uint8_t>  proto_type(event->type);
 | 
						|
  Protocol_chunk<boost::uint64_t> proto_value(event->value);
 | 
						|
  is >> proto_type
 | 
						|
     >> proto_value;
 | 
						|
 | 
						|
  return event;
 | 
						|
}
 | 
						|
 | 
						|
User_var_event *proto_uservar_event(std::istream &is, Log_event_header *header)
 | 
						|
{
 | 
						|
  User_var_event *event= new User_var_event(header);
 | 
						|
 | 
						|
  boost::uint32_t name_len;
 | 
						|
  Protocol_chunk<boost::uint32_t> proto_name_len(name_len);
 | 
						|
 | 
						|
  is >> proto_name_len;
 | 
						|
 | 
						|
  Protocol_chunk_string proto_name(event->name, name_len);
 | 
						|
  Protocol_chunk<boost::uint8_t>  proto_null(event->is_null);
 | 
						|
 | 
						|
  is >> proto_name >> proto_null;
 | 
						|
  if (event->is_null)
 | 
						|
  {
 | 
						|
    event->type = User_var_event::STRING_TYPE;
 | 
						|
    event->charset = 63;                        // Binary charset
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    boost::uint32_t value_len;
 | 
						|
    Protocol_chunk<boost::uint8_t> proto_type(event->type);
 | 
						|
    Protocol_chunk<boost::uint32_t> proto_charset(event->charset);
 | 
						|
    Protocol_chunk<boost::uint32_t> proto_val_len(value_len);
 | 
						|
    is >> proto_type >> proto_charset >> proto_val_len;
 | 
						|
    Protocol_chunk_string proto_value(event->value, value_len);
 | 
						|
    is >> proto_value;
 | 
						|
  }
 | 
						|
 | 
						|
  return event;
 | 
						|
}
 | 
						|
 | 
						|
Table_map_event *proto_table_map_event(std::istream &is, Log_event_header *header)
 | 
						|
{
 | 
						|
  Table_map_event *tmev=new Table_map_event(header);
 | 
						|
  boost::uint64_t columns_len= 0;
 | 
						|
  boost::uint64_t metadata_len= 0;
 | 
						|
  union
 | 
						|
  {
 | 
						|
    boost::uint64_t integer;
 | 
						|
    boost::uint8_t bytes[6];
 | 
						|
  } table_id;
 | 
						|
  char zero_marker= 0;
 | 
						|
 | 
						|
  table_id.integer=0L;
 | 
						|
  Protocol_chunk<boost::uint8_t> proto_table_id(&table_id.bytes[0], 6);
 | 
						|
  Protocol_chunk<boost::uint16_t> proto_flags(tmev->flags);
 | 
						|
  Protocol_chunk_string_len proto_db_name(tmev->db_name);
 | 
						|
  Protocol_chunk<boost::uint8_t> proto_marker(zero_marker); // Should be '\0'
 | 
						|
  Protocol_chunk_string_len proto_table_name(tmev->table_name);
 | 
						|
  Protocol_chunk<boost::uint64_t> proto_columns_len(columns_len);
 | 
						|
  proto_columns_len.set_length_encoded_binary(true);
 | 
						|
 | 
						|
  is >> proto_table_id
 | 
						|
     >> proto_flags
 | 
						|
     >> proto_db_name
 | 
						|
     >> proto_marker
 | 
						|
     >> proto_table_name
 | 
						|
     >> proto_marker
 | 
						|
     >> proto_columns_len;
 | 
						|
  tmev->table_id=table_id.integer;
 | 
						|
  Protocol_chunk_vector proto_columns(tmev->columns, columns_len);
 | 
						|
  Protocol_chunk<boost::uint64_t> proto_metadata_len(metadata_len);
 | 
						|
  proto_metadata_len.set_length_encoded_binary(true);
 | 
						|
 | 
						|
  is >> proto_columns
 | 
						|
     >> proto_metadata_len;
 | 
						|
  Protocol_chunk_vector proto_metadata(tmev->metadata, (unsigned long)metadata_len);
 | 
						|
  is >> proto_metadata;
 | 
						|
  unsigned long null_bits_len=(int) ((tmev->columns.size() + 7) / 8);
 | 
						|
 | 
						|
  Protocol_chunk_vector proto_null_bits(tmev->null_bits, null_bits_len);
 | 
						|
 | 
						|
  is >> proto_null_bits;
 | 
						|
  return tmev;
 | 
						|
}
 | 
						|
 | 
						|
std::istream &operator>>(std::istream &is, Protocol_chunk_vector &chunk)
 | 
						|
{
 | 
						|
  unsigned long size= chunk.m_size;
 | 
						|
  for(int i=0; i< size; i++)
 | 
						|
  {
 | 
						|
    char ch;
 | 
						|
    is.get(ch);
 | 
						|
    chunk.m_vec->push_back(ch);
 | 
						|
  }
 | 
						|
  return is;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
} } // end namespace mysql::system
 |