Remove directory replication_listener

Old stuff, not maintained and not used by anything or anybody.
This commit is contained in:
Johan Wikman 2016-03-09 09:06:09 +02:00
parent 2486baffea
commit c3bacba5ef
34 changed files with 0 additions and 6467 deletions

View File

@ -1,48 +0,0 @@
project (mariadb-replication-listener-api)
cmake_minimum_required(VERSION 2.6)
# This configuration file builds both the static and shared version of
# the library.
set(replication_sources
access_method_factory.cpp
binlog_driver.cpp tcp_driver.cpp basic_content_handler.cpp
binary_log.cpp protocol.cpp binlog_event.cpp
gtid.cpp resultset_iterator.cpp value.cpp row_of_fields.cpp)
# Find MySQL client library and header files
find_library(MySQL_LIBRARY NAMES libmysqld.a PATHS
/usr/lib64/mysql /usr/lib/mysql /usr/local/mysql/lib ${MARIADB_SRC_PATH}/lib)
SET(Boost_DEBUG FALSE)
SET(Boost_FIND_REQUIRED TRUE)
SET(Boost_FIND_QUIETLY TRUE)
SET(Boost_USE_STATIC_LIBS FALSE)
SET(Boost_ADDITIONAL_VERSIONS "1.41" "1.41.0")
FIND_PACKAGE(Boost REQUIRED system thread)
FIND_LIBRARY(LIB_CRYPTO NAMES libcrypto.a /opt/local/lib /opt/lib /usr/lib /usr/local/lib /usr/local/ssl/lib)
LINK_DIRECTORIES(${LIB_CRYPTO})
LINK_DIRECTORIES(${Boost_LIBRARY_DIRS})
INCLUDE_DIRECTORIES(${Boost_INCLUDE_DIR})
include_directories(${MARIADB_SRC_PATH})
include_directories(/usr/local/mysql/include)
include_directories(../../utils)
include_directories(.)
# Configure for building static library
add_library(replication_static STATIC ${replication_sources})
target_link_libraries(replication_static ${CYPTO} ${Boost_LIBRARIES} ${MySQL_LIBRARY})
set_target_properties(replication_static PROPERTIES
OUTPUT_NAME "replication")
# Configure for building shared library
add_library(replication_shared SHARED ${replication_sources})
target_link_libraries(replication_shared ${CYPTO} ${Boost_LIBRARIES} ${MySQL_LIBRARY})
set_target_properties(replication_shared PROPERTIES
VERSION 0.1 SOVERSION 1
OUTPUT_NAME "replication")
install(TARGETS replication_shared LIBRARY DESTINATION lib)
install(TARGETS replication_static ARCHIVE DESTINATION lib)

View File

@ -1,280 +0,0 @@
GNU GENERAL PUBLIC LICENSE
Version 2, June 1991
Copyright (C) 1989, 1991 Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
Preamble
The licenses for most software are designed to take away your
freedom to share and change it. By contrast, the GNU General Public
License is intended to guarantee your freedom to share and change free
software--to make sure the software is free for all its users. This
General Public License applies to most of the Free Software
Foundation's software and to any other program whose authors commit to
using it. (Some other Free Software Foundation software is covered by
the GNU Lesser General Public License instead.) You can apply it to
your programs, too.
When we speak of free software, we are referring to freedom, not
price. Our General Public Licenses are designed to make sure that you
have the freedom to distribute copies of free software (and charge for
this service if you wish), that you receive source code or can get it
if you want it, that you can change the software or use pieces of it
in new free programs; and that you know you can do these things.
To protect your rights, we need to make restrictions that forbid
anyone to deny you these rights or to ask you to surrender the rights.
These restrictions translate to certain responsibilities for you if you
distribute copies of the software, or if you modify it.
For example, if you distribute copies of such a program, whether
gratis or for a fee, you must give the recipients all the rights that
you have. You must make sure that they, too, receive or can get the
source code. And you must show them these terms so they know their
rights.
We protect your rights with two steps: (1) copyright the software, and
(2) offer you this license which gives you legal permission to copy,
distribute and/or modify the software.
Also, for each author's protection and ours, we want to make certain
that everyone understands that there is no warranty for this free
software. If the software is modified by someone else and passed on, we
want its recipients to know that what they have is not the original, so
that any problems introduced by others will not reflect on the original
authors' reputations.
Finally, any free program is threatened constantly by software
patents. We wish to avoid the danger that redistributors of a free
program will individually obtain patent licenses, in effect making the
program proprietary. To prevent this, we have made it clear that any
patent must be licensed for everyone's free use or not licensed at all.
The precise terms and conditions for copying, distribution and
modification follow.
GNU GENERAL PUBLIC LICENSE
TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
0. This License applies to any program or other work which contains
a notice placed by the copyright holder saying it may be distributed
under the terms of this General Public License. The "Program", below,
refers to any such program or work, and a "work based on the Program"
means either the Program or any derivative work under copyright law:
that is to say, a work containing the Program or a portion of it,
either verbatim or with modifications and/or translated into another
language. (Hereinafter, translation is included without limitation in
the term "modification".) Each licensee is addressed as "you".
Activities other than copying, distribution and modification are not
covered by this License; they are outside its scope. The act of
running the Program is not restricted, and the output from the Program
is covered only if its contents constitute a work based on the
Program (independent of having been made by running the Program).
Whether that is true depends on what the Program does.
1. You may copy and distribute verbatim copies of the Program's
source code as you receive it, in any medium, provided that you
conspicuously and appropriately publish on each copy an appropriate
copyright notice and disclaimer of warranty; keep intact all the
notices that refer to this License and to the absence of any warranty;
and give any other recipients of the Program a copy of this License
along with the Program.
You may charge a fee for the physical act of transferring a copy, and
you may at your option offer warranty protection in exchange for a fee.
2. You may modify your copy or copies of the Program or any portion
of it, thus forming a work based on the Program, and copy and
distribute such modifications or work under the terms of Section 1
above, provided that you also meet all of these conditions:
a) You must cause the modified files to carry prominent notices
stating that you changed the files and the date of any change.
b) You must cause any work that you distribute or publish, that in
whole or in part contains or is derived from the Program or any
part thereof, to be licensed as a whole at no charge to all third
parties under the terms of this License.
c) If the modified program normally reads commands interactively
when run, you must cause it, when started running for such
interactive use in the most ordinary way, to print or display an
announcement including an appropriate copyright notice and a
notice that there is no warranty (or else, saying that you provide
a warranty) and that users may redistribute the program under
these conditions, and telling the user how to view a copy of this
License. (Exception: if the Program itself is interactive but
does not normally print such an announcement, your work based on
the Program is not required to print an announcement.)
These requirements apply to the modified work as a whole. If
identifiable sections of that work are not derived from the Program,
and can be reasonably considered independent and separate works in
themselves, then this License, and its terms, do not apply to those
sections when you distribute them as separate works. But when you
distribute the same sections as part of a whole which is a work based
on the Program, the distribution of the whole must be on the terms of
this License, whose permissions for other licensees extend to the
entire whole, and thus to each and every part regardless of who wrote it.
Thus, it is not the intent of this section to claim rights or contest
your rights to work written entirely by you; rather, the intent is to
exercise the right to control the distribution of derivative or
collective works based on the Program.
In addition, mere aggregation of another work not based on the Program
with the Program (or with a work based on the Program) on a volume of
a storage or distribution medium does not bring the other work under
the scope of this License.
3. You may copy and distribute the Program (or a work based on it,
under Section 2) in object code or executable form under the terms of
Sections 1 and 2 above provided that you also do one of the following:
a) Accompany it with the complete corresponding machine-readable
source code, which must be distributed under the terms of Sections
1 and 2 above on a medium customarily used for software interchange; or,
b) Accompany it with a written offer, valid for at least three
years, to give any third party, for a charge no more than your
cost of physically performing source distribution, a complete
machine-readable copy of the corresponding source code, to be
distributed under the terms of Sections 1 and 2 above on a medium
customarily used for software interchange; or,
c) Accompany it with the information you received as to the offer
to distribute corresponding source code. (This alternative is
allowed only for noncommercial distribution and only if you
received the program in object code or executable form with such
an offer, in accord with Subsection b above.)
The source code for a work means the preferred form of the work for
making modifications to it. For an executable work, complete source
code means all the source code for all modules it contains, plus any
associated interface definition files, plus the scripts used to
control compilation and installation of the executable. However, as a
special exception, the source code distributed need not include
anything that is normally distributed (in either source or binary
form) with the major components (compiler, kernel, and so on) of the
operating system on which the executable runs, unless that component
itself accompanies the executable.
If distribution of executable or object code is made by offering
access to copy from a designated place, then offering equivalent
access to copy the source code from the same place counts as
distribution of the source code, even though third parties are not
compelled to copy the source along with the object code.
4. You may not copy, modify, sublicense, or distribute the Program
except as expressly provided under this License. Any attempt
otherwise to copy, modify, sublicense or distribute the Program is
void, and will automatically terminate your rights under this License.
However, parties who have received copies, or rights, from you under
this License will not have their licenses terminated so long as such
parties remain in full compliance.
5. You are not required to accept this License, since you have not
signed it. However, nothing else grants you permission to modify or
distribute the Program or its derivative works. These actions are
prohibited by law if you do not accept this License. Therefore, by
modifying or distributing the Program (or any work based on the
Program), you indicate your acceptance of this License to do so, and
all its terms and conditions for copying, distributing or modifying
the Program or works based on it.
6. Each time you redistribute the Program (or any work based on the
Program), the recipient automatically receives a license from the
original licensor to copy, distribute or modify the Program subject to
these terms and conditions. You may not impose any further
restrictions on the recipients' exercise of the rights granted herein.
You are not responsible for enforcing compliance by third parties to
this License.
7. If, as a consequence of a court judgment or allegation of patent
infringement or for any other reason (not limited to patent issues),
conditions are imposed on you (whether by court order, agreement or
otherwise) that contradict the conditions of this License, they do not
excuse you from the conditions of this License. If you cannot
distribute so as to satisfy simultaneously your obligations under this
License and any other pertinent obligations, then as a consequence you
may not distribute the Program at all. For example, if a patent
license would not permit royalty-free redistribution of the Program by
all those who receive copies directly or indirectly through you, then
the only way you could satisfy both it and this License would be to
refrain entirely from distribution of the Program.
If any portion of this section is held invalid or unenforceable under
any particular circumstance, the balance of the section is intended to
apply and the section as a whole is intended to apply in other
circumstances.
It is not the purpose of this section to induce you to infringe any
patents or other property right claims or to contest validity of any
such claims; this section has the sole purpose of protecting the
integrity of the free software distribution system, which is
implemented by public license practices. Many people have made
generous contributions to the wide range of software distributed
through that system in reliance on consistent application of that
system; it is up to the author/donor to decide if he or she is willing
to distribute software through any other system and a licensee cannot
impose that choice.
This section is intended to make thoroughly clear what is believed to
be a consequence of the rest of this License.
8. If the distribution and/or use of the Program is restricted in
certain countries either by patents or by copyrighted interfaces, the
original copyright holder who places the Program under this License
may add an explicit geographical distribution limitation excluding
those countries, so that distribution is permitted only in or among
countries not thus excluded. In such case, this License incorporates
the limitation as if written in the body of this License.
9. The Free Software Foundation may publish revised and/or new versions
of the General Public License from time to time. Such new versions will
be similar in spirit to the present version, but may differ in detail to
address new problems or concerns.
Each version is given a distinguishing version number. If the Program
specifies a version number of this License which applies to it and "any
later version", you have the option of following the terms and conditions
either of that version or of any later version published by the Free
Software Foundation. If the Program does not specify a version number of
this License, you may choose any version ever published by the Free Software
Foundation.
10. If you wish to incorporate parts of the Program into other free
programs whose distribution conditions are different, write to the author
to ask for permission. For software which is copyrighted by the Free
Software Foundation, write to the Free Software Foundation; we sometimes
make exceptions for this. Our decision will be guided by the two goals
of preserving the free status of all derivatives of our free software and
of promoting the sharing and reuse of software generally.
NO WARRANTY
11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY
FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN
OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES
PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED
OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS
TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE
PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING,
REPAIR OR CORRECTION.
12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR
REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES,
INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING
OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED
TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY
YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER
PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE
POSSIBILITY OF SUCH DAMAGES.
END OF TERMS AND CONDITIONS

View File

@ -1,30 +0,0 @@
Portions of this software contain modifications contributed by MariaDB Corporation, Ab.
These contributions are used with the following license:
Copyright (c) 2013, MariaDB Corporation Ab. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following
disclaimer in the documentation and/or other materials
provided with the distribution.
* Neither the name of the MariaDB Corporation Ab. nor the names of its
contributors may be used to endorse or promote products
derived from this software without specific prior written
permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,17 +0,0 @@
This code was forked from https://github.com/SponsorPay/mysql-replication-listener
Dependencies
------------
You need to have CMake version 2.8 or later and Boost version 1.35.0
or later since Asio is required.
Building
--------
To build the entire package, it is first necessary to run CMake to build all the makefiles.
cmake .
make -j4

View File

@ -1,122 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
Copyright (c) 2013, MariaDB Corporation Ab
Portions of this file contain modifications contributed and copyrighted by
MariaDB Corporation, Ab. Those modifications are gratefully acknowledged and are described
briefly in the source code.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
/*
MariaDB Corporation change details:
- Removed unnecessary file driver
Author: Jan Lindström (jan.lindstrom@mariadb.com
*/
#include "access_method_factory.h"
#include "tcp_driver.h"
using mysql::system::Binary_log_driver;
using mysql::system::Binlog_tcp_driver;
/**
Parse the body of a MySQL URI.
The format is <code>user[:password]@host[:port]</code>
*/
static Binary_log_driver *parse_mysql_url(const char *body, size_t len)
{
/* Find the beginning of the user name */
if (strncmp(body, "//", 2) != 0)
return 0;
/* Find the user name, which is mandatory */
const char *user = body + 2;
const char *user_end= strpbrk(user, ":@");
if (user_end == 0 || user_end == user)
return 0;
assert(user_end - user >= 1); // There has to be a username
/* Find the password, which can be empty */
assert(*user_end == ':' || *user_end == '@');
const char *const pass = user_end + 1; // Skip the ':' (or '@')
const char *pass_end = pass;
if (*user_end == ':')
{
pass_end = strchr(pass, '@');
if (pass_end == 0)
return 0; // There should be a password, but '@' was not found
}
assert(pass_end - pass >= 0); // Password can be empty
/* Find the host name, which is mandatory */
// Skip the '@', if there is one
const char *host = *pass_end == '@' ? pass_end + 1 : pass_end;
const char *host_end = strchr(host, ':');
if (host == host_end)
return 0; // No hostname was found
/* If no ':' was found there is no port, so the host end at the end
* of the string */
if (host_end == 0)
host_end = body + len;
assert(host_end - host >= 1); // There has to be a host
/* Find the port number */
unsigned long portno = 3306;
if (*host_end == ':')
portno = strtoul(host_end + 1, NULL, 10);
/* Host name is now the string [host, port-1) if port != NULL and [host, EOS) otherwise. */
/* Port number is stored in portno, either the default, or a parsed one */
return new Binlog_tcp_driver(std::string(user, user_end - user),
std::string(pass, pass_end - pass),
std::string(host, host_end - host),
portno);
}
/**
URI parser information.
*/
struct Parser {
const char* protocol;
Binary_log_driver *(*parser)(const char *body, size_t length);
};
/**
Array of schema names and matching parsers.
*/
static Parser url_parser[] = {
{ "mysql", parse_mysql_url },
};
Binary_log_driver *
mysql::system::create_transport(const char *url)
{
const char *pfx = strchr(url, ':');
if (pfx == 0)
return NULL;
for (int i = 0 ; i < sizeof(url_parser)/sizeof(*url_parser) ; ++i)
{
const char *proto = url_parser[i].protocol;
if (strncmp(proto, url, strlen(proto)) == 0)
return (*url_parser[i].parser)(pfx+1, strlen(pfx+1));
}
return NULL;
}

View File

@ -1,39 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
Copyright (c) 2013-2014, MariaDB Corporation Ab
Portions of this file contain modifications contributed and copyrighted by
MariaDB Corporation, Ab. Those modifications are gratefully acknowledged and are described
briefly in the source code.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
#ifndef _ACCESS_METHOD_FACTORY_H
#define _ACCESS_METHOD_FACTORY_H
#include "binlog_driver.h"
namespace mysql {
namespace system {
Binary_log_driver *create_transport(const char *url);
Binary_log_driver *parse_mysql_url(char *url, const char
*mysql_access_method);
}
}
#endif /* _ACCESS_METHOD_FACTORY_H */

View File

@ -1,120 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
Copyright (c) 2013, MariaDB Corporation Ab
Portions of this file contain modifications contributed and copyrighted by
MariaDB Corporation, Ab. Those modifications are gratefully acknowledged and are described
briefly in the source code.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
/*
MariaDB Corporation change details:
- Added GTID event handler
Author: Jan Lindström (jan.lindstrom@mariadb.com
*/
#include "basic_content_handler.h"
#include <boost/bind.hpp>
namespace mysql {
Content_handler::Content_handler () {}
Content_handler::~Content_handler () {}
mysql::Binary_log_event *Content_handler::process_event(mysql::Query_event *ev) { return ev; }
mysql::Binary_log_event *Content_handler::process_event(mysql::Row_event *ev) { return ev; }
mysql::Binary_log_event *Content_handler::process_event(mysql::Table_map_event *ev) { return ev; }
mysql::Binary_log_event *Content_handler::process_event(mysql::Xid *ev) { return ev; }
mysql::Binary_log_event *Content_handler::process_event(mysql::User_var_event *ev) { return ev; }
mysql::Binary_log_event *Content_handler::process_event(mysql::Incident_event *ev) { return ev; }
mysql::Binary_log_event *Content_handler::process_event(mysql::Rotate_event *ev) { return ev; }
mysql::Binary_log_event *Content_handler::process_event(mysql::Int_var_event *ev) { return ev; }
mysql::Binary_log_event *Content_handler::process_event(mysql::Binary_log_event *ev) { return ev; }
mysql::Binary_log_event *Content_handler::process_event(mysql::Gtid_event *ev) {return ev; }
Injection_queue *Content_handler::get_injection_queue(void)
{
return m_reinject_queue;
}
void Content_handler::set_injection_queue(Injection_queue *queue)
{
m_reinject_queue= queue;
}
mysql::Binary_log_event*
Content_handler::internal_process_event(mysql::Binary_log_event *ev)
{
mysql::Binary_log_event *processed_event= 0;
switch(ev->header ()->type_code) {
case mysql::QUERY_EVENT:
processed_event= process_event(static_cast<mysql::Query_event*>(ev));
break;
case mysql::GTID_EVENT_MARIADB:
case mysql::GTID_EVENT_MYSQL:
processed_event= process_event(static_cast<mysql::Gtid_event*>(ev));
break;
case mysql::WRITE_ROWS_EVENT:
case mysql::UPDATE_ROWS_EVENT:
case mysql::DELETE_ROWS_EVENT:
processed_event= process_event(static_cast<mysql::Row_event*>(ev));
break;
case mysql::USER_VAR_EVENT:
processed_event= process_event(static_cast<mysql::User_var_event *>(ev));
break;
case mysql::ROTATE_EVENT:
processed_event= process_event(static_cast<mysql::Rotate_event *>(ev));
break;
case mysql::INCIDENT_EVENT:
processed_event= process_event(static_cast<mysql::Incident_event *>(ev));
break;
case mysql::XID_EVENT:
processed_event= process_event(static_cast<mysql::Xid *>(ev));
break;
case mysql::TABLE_MAP_EVENT:
processed_event= process_event(static_cast<mysql::Table_map_event *>(ev));
break;
/* TODO ********************************************************************/
case mysql::FORMAT_DESCRIPTION_EVENT:
processed_event= process_event(ev);
break;
case mysql::BEGIN_LOAD_QUERY_EVENT:
processed_event= process_event(ev);
break;
case mysql::EXECUTE_LOAD_QUERY_EVENT:
processed_event= process_event(ev);
break;
case mysql::INTVAR_EVENT:
processed_event= process_event(ev);
break;
case mysql::STOP_EVENT:
processed_event= process_event(ev);
break;
case mysql::RAND_EVENT:
processed_event= process_event(ev);
break;
/****************************************************************************/
default:
processed_event= process_event(ev);
break;
}
return processed_event;
}
} // end namespace

View File

@ -1,94 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
Copyright (c) 2013-2014, MariaDB Corporation Ab
Portions of this file contain modifications contributed and copyrighted by
MariaDB Corporation, Ab. Those modifications are gratefully acknowledged and are described
briefly in the source code.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
/*
MariaDB Corporation change details:
- Added GTID event handler
Author: Jan Lindström (jan.lindstrom@mariadb.com
*/
#ifndef BASIC_CONTENT_HANDLER_H
#define BASIC_CONTENT_HANDLER_H
#include "binlog_event.h"
namespace mysql {
class Injection_queue : public std::list<mysql::Binary_log_event * >
{
public:
Injection_queue() : std::list<mysql::Binary_log_event * >() {}
~Injection_queue() {}
};
/**
* A content handler accepts an event and returns the same event,
* a new one or 0 (the event was consumed by the content handler).
* The default behaviour is to return the event unaffected.
* The generic event handler is used for events which aren't routed to
* a dedicated member function, user defined events being the most
* common case.
*/
class Content_handler {
public:
Content_handler();
Content_handler(const mysql::Content_handler& orig);
virtual ~Content_handler();
virtual mysql::Binary_log_event *process_event(mysql::Query_event *ev);
virtual mysql::Binary_log_event *process_event(mysql::Row_event *ev);
virtual mysql::Binary_log_event *process_event(mysql::Table_map_event *ev);
virtual mysql::Binary_log_event *process_event(mysql::Xid *ev);
virtual mysql::Binary_log_event *process_event(mysql::User_var_event *ev);
virtual mysql::Binary_log_event *process_event(mysql::Incident_event *ev);
virtual mysql::Binary_log_event *process_event(mysql::Rotate_event *ev);
virtual mysql::Binary_log_event *process_event(mysql::Int_var_event *ev);
virtual mysql::Binary_log_event *process_event(mysql::Gtid_event *ev);
/**
Process any event which hasn't been registered yet.
*/
virtual mysql::Binary_log_event *process_event(mysql::Binary_log_event *ev);
protected:
/**
* The Injection queue is emptied before any new event is pulled from
* the Binary_log_driver. Injected events will pass through all content
* handlers. The Injection_queue is a derived std::list.
*/
Injection_queue *get_injection_queue();
private:
Injection_queue *m_reinject_queue;
void set_injection_queue(Injection_queue *injection_queue);
mysql::Binary_log_event *internal_process_event(mysql::Binary_log_event *ev);
friend class Binary_log;
};
} // end namespace
#endif /* BASIC_CONTENT_HANDLER_H */

View File

@ -1,83 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
#ifndef _BASIC_TRANSACTION_PARSER_H
#define _BASIC_TRANSACTION_PARSER_H
/*
TODO The Transaction_log_event and Basic_transaction_parser will be removed
from this library and replaced with a table map indexer instead which can be
used to retrive table names.
*/
#include <list>
#include <boost/cstdint.hpp>
#include "binlog_event.h"
#include "basic_content_handler.h"
#include <iostream>
namespace mysql {
typedef std::pair<boost::uint64_t, Binary_log_event *> Event_index_element;
typedef std::map<boost::uint64_t, Binary_log_event *> Int_to_Event_map;
class Transaction_log_event : public Binary_log_event
{
public:
Transaction_log_event() : Binary_log_event() {}
Transaction_log_event(Log_event_header *header) : Binary_log_event(header) {}
virtual ~Transaction_log_event();
Int_to_Event_map &table_map() { return m_table_map; }
/**
* Index for easier table name look up
*/
Int_to_Event_map m_table_map;
std::list<Binary_log_event *> m_events;
};
Transaction_log_event *create_transaction_log_event(void);
class Basic_transaction_parser : public mysql::Content_handler
{
public:
Basic_transaction_parser() : mysql::Content_handler()
{
m_transaction_state= NOT_IN_PROGRESS;
}
mysql::Binary_log_event *process_event(mysql::Query_event *ev);
mysql::Binary_log_event *process_event(mysql::Row_event *ev);
mysql::Binary_log_event *process_event(mysql::Table_map_event *ev);
mysql::Binary_log_event *process_event(mysql::Xid *ev);
mysql::Binary_log_event *process_event(mysql::Binary_log_event *ev) {return ev; }
mysql::Binary_log_event *process_event(mysql::Gtid_event *ev);
private:
boost::uint32_t m_start_time;
enum Transaction_states { STARTING, IN_PROGRESS, COMMITTING, NOT_IN_PROGRESS } ;
enum Transaction_states m_transaction_state;
std::list <mysql::Binary_log_event *> m_event_stack;
mysql::Binary_log_event *process_transaction_state(mysql::Binary_log_event *ev);
};
} // end namespace
#endif /* _BASIC_TRANSACTION_PARSER_H */

View File

@ -1,187 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
Copyright (c) 2013, MariaDB Corporation Ab
Portions of this file contain modifications contributed and copyrighted by
MariaDB Corporation, Ab. Those modifications are gratefully acknowledged and are described
briefly in the source code.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
/*
MariaDB Corporation change details:
- Added support for setting binlog position based on GTID
- Added support for MySQL and MariDB server types
Author: Jan Lindström (jan.lindstrom@mariadb.com
*/
#include <list>
#include "binlog_api.h"
#include <boost/foreach.hpp>
using namespace mysql;
using namespace mysql::system;
namespace mysql
{
/*
Return server type string.
*/
const char *mysql_server_type_str(mysql_server_types server_type)
{
switch(server_type) {
case MYSQL_SERVER_TYPE_MARIADB: return "MariaDB";
case MYSQL_SERVER_TYPE_MYSQL: return "MySQL";
default: return "Unknown";
}
}
Binary_log::Binary_log(Binary_log_driver *drv) : m_binlog_position(4), m_binlog_file(""), m_uri("")
{
if (drv == NULL)
{
m_driver= &m_dummy_driver;
}
else
m_driver= drv;
}
Binary_log::Binary_log(Binary_log_driver *drv, std::string uri) : m_binlog_position(4), m_binlog_file(""), m_uri(uri)
{
if (drv == NULL)
{
m_driver= &m_dummy_driver;
}
else
m_driver= drv;
}
Content_handler_pipeline *Binary_log::content_handler_pipeline(void)
{
return &m_content_handlers;
}
int Binary_log::wait_for_next_event(mysql::Binary_log_event **event_ptr)
{
int rc;
bool handler_code;
mysql::Binary_log_event *event;
mysql::Injection_queue reinjection_queue;
do {
handler_code= false;
if (!reinjection_queue.empty())
{
event= reinjection_queue.front();
reinjection_queue.pop_front();
}
else
{
// Return in case of non-ERR_OK.
if(rc= m_driver->wait_for_next_event(&event))
return rc;
}
m_binlog_position= event->header()->next_position;
mysql::Content_handler *handler;
BOOST_FOREACH(handler, m_content_handlers)
{
if (event)
{
handler->set_injection_queue(&reinjection_queue);
event= handler->internal_process_event(event);
}
}
} while(event == 0 || !reinjection_queue.empty());
if (event_ptr)
*event_ptr= event;
return 0;
}
int Binary_log::set_position(const std::string &filename, unsigned long position)
{
int status= m_driver->set_position(filename, position);
if (status == ERR_OK)
{
m_binlog_file= filename;
m_binlog_position= position;
}
return status;
}
int Binary_log::set_position(unsigned long position)
{
std::string filename;
m_driver->get_position(&filename, NULL);
return this->set_position(filename, position);
}
int Binary_log::set_position_gtid(const Gtid gtid)
{
return this->set_position_gtid(gtid);
}
unsigned long Binary_log::get_position(void)
{
return m_binlog_position;
}
unsigned long Binary_log::get_position(std::string &filename)
{
m_driver->get_position(&m_binlog_file, &m_binlog_position);
filename= m_binlog_file;
return m_binlog_position;
}
int Binary_log::connect()
{
return m_driver->connect();
}
int Binary_log::connect(const boost::uint64_t binlog_pos)
{
return m_driver->connect(binlog_pos);
}
int Binary_log::connect(const Gtid gtid)
{
return m_driver->connect(gtid);
}
mysql_server_types Binary_log::get_mysql_server_type() const
{
return m_driver->get_mysql_server_type();
}
const char *Binary_log::get_mysql_server_type_str() const
{
return mysql_server_type_str(get_mysql_server_type());
}
void Binary_log::shutdown()
{
m_driver->shutdown();
}
}

View File

@ -1,192 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
Copyright (c) 2013-2014, MariaDB Corporation Ab
Portions of this file contain modifications contributed and copyrighted by
MariaDB Corporation, Ab. Those modifications are gratefully acknowledged and are described
briefly in the source code.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
/*
MariaDB Corporation change details:
- Added support for setting binlog position based on GTID
- Added support for MySQL and MariDB server types
Author: Jan Lindström (jan.lindstrom@mariadb.com
*/
#ifndef _REPEVENT_H
#define _REPEVENT_H
#include <iosfwd>
#include <boost/iostreams/categories.hpp>
#include <boost/iostreams/positioning.hpp>
#include <boost/iostreams/concepts.hpp>
#include <boost/asio.hpp>
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <list>
#include <cassert>
#include "binlog_event.h"
#include "binlog_driver.h"
#include "tcp_driver.h"
#include "basic_content_handler.h"
#include "access_method_factory.h"
#include "gtid.h"
namespace io = boost::iostreams;
namespace mysql
{
/**
* Error codes.
*/
enum Error_code {
ERR_OK = 0, /* All OK */
ERR_EOF, /* End of file */
ERR_FAIL, /* Unspecified failure */
ERROR_CODE_COUNT
};
/**
* Returns true if the event is consumed
*/
typedef boost::function< bool (Binary_log_event *& )> Event_content_handler;
class Dummy_driver : public system::Binary_log_driver
{
public:
Dummy_driver() : Binary_log_driver("", 0) {}
virtual ~Dummy_driver() {}
virtual int connect() { return 1; }
virtual int connect(const Gtid gtid) { return 1; }
virtual int connect(const boost::uint64_t binlog_pos) { return 1;}
virtual int wait_for_next_event(mysql::Binary_log_event **event) {
return ERR_EOF;
}
virtual int set_position(const std::string &str, unsigned long position) {
return ERR_OK;
}
virtual int set_position_gtid(const Gtid gtid) {
return ERR_OK;
}
virtual int get_position(std::string *str, unsigned long *position) {
return ERR_OK;
}
virtual int fetch_server_version(const std::string& user,
const std::string& passwd,
const std::string& host,
long port)
{
return ERR_OK;
}
virtual void shutdown() {}
};
class Content_handler;
typedef std::list<Content_handler *> Content_handler_pipeline;
class Binary_log {
private:
system::Binary_log_driver *m_driver;
Dummy_driver m_dummy_driver;
Content_handler_pipeline m_content_handlers;
unsigned long m_binlog_position;
std::string m_binlog_file;
mysql_server_types m_server_type;
std::string m_uri;
public:
Binary_log(system::Binary_log_driver *drv);
Binary_log(system::Binary_log_driver *drv, std::string);
~Binary_log() {}
int connect();
int connect(const Gtid gtid);
int connect(const boost::uint64_t binlog_pos);
/**
* Blocking attempt to get the next binlog event from the stream
*/
int wait_for_next_event(Binary_log_event **event);
/**
* Inserts/removes content handlers in and out of the chain
* The Content_handler_pipeline is a derived std::list
*/
Content_handler_pipeline *content_handler_pipeline();
/**
* Set the binlog position (filename, position)
*
* @return Error_code
* @retval ERR_OK The position is updated.
* @retval ERR_EOF The position is out-of-range
* @retval >= ERR_CODE_COUNT An unspecified error occurred
*/
int set_position(const std::string &filename, unsigned long position);
/**
* Set the binlog position using current filename
* @param position Requested position
*
* @return Error_code
* @retval ERR_OK The position is updated.
* @retval ERR_EOF The position is out-of-range
* @retval >= ERR_CODE_COUNT An unspecified error occurred
*/
int set_position(unsigned long position);
int set_position_gtid(const Gtid gtid);
/**
* Fetch the binlog position for the current file
*/
unsigned long get_position(void);
/**
* Fetch the current active binlog file name.
* @param[out] filename
* TODO replace reference with a pointer.
* @return The file position
*/
unsigned long get_position(std::string &filename);
mysql_server_types get_mysql_server_type() const;
const char *get_mysql_server_type_str() const;
std::string get_url() const {return m_uri; }
void shutdown();
};
}
#endif /* _REPEVENT_H */

View File

@ -1,87 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
Copyright (c) 2013, MariaDB Corporation Ab
Portions of this file contain modifications contributed and copyrighted by
MariaDB Corporation, Ab. Those modifications are gratefully acknowledged and are described
briefly in the source code.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
/*
MariaDB Corporation change details:
- Added support for GTID event handling for both MySQL and MariaDB
Author: Jan Lindström (jan.lindstrom@mariadb.com
*/
#include "binlog_driver.h"
namespace mysql { namespace system {
Binary_log_event* Binary_log_driver::parse_event(std::istream &is,
Log_event_header *header)
{
Binary_log_event *parsed_event= 0;
switch (header->type_code) {
case TABLE_MAP_EVENT:
parsed_event= proto_table_map_event(is, header);
break;
case QUERY_EVENT:
parsed_event= proto_query_event(is, header);
break;
case GTID_EVENT_MARIADB:
case GTID_EVENT_MYSQL:
parsed_event= proto_gtid_event(is, header);
break;
case INCIDENT_EVENT:
parsed_event= proto_incident_event(is, header);
break;
case WRITE_ROWS_EVENT:
case UPDATE_ROWS_EVENT:
case DELETE_ROWS_EVENT:
parsed_event= proto_rows_event(is, header);
break;
case ROTATE_EVENT:
{
Rotate_event *rot= proto_rotate_event(is, header);
m_binlog_file_name= rot->binlog_file;
m_binlog_offset= (unsigned long)rot->binlog_pos;
parsed_event= rot;
}
break;
case INTVAR_EVENT:
parsed_event= proto_intvar_event(is, header);
break;
case USER_VAR_EVENT:
parsed_event= proto_uservar_event(is, header);
break;
default:
{
// Create a dummy driver.
parsed_event= new Binary_log_event(header);
}
}
return parsed_event;
}
}
}

View File

@ -1,119 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
Copyright (c) 2013-2014, MariaDB Corporation Ab
Portions of this file contain modifications contributed and copyrighted by
MariaDB Corporation, Ab. Those modifications are gratefully acknowledged and are described
briefly in the source code.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
/*
MariaDB Corporation change details:
- Added support for GTID event handling for both MySQL and MariaDB
- Added support for setting binlog position based on GTID
Author: Jan Lindström (jan.lindstrom@mariadb.com
*/
#ifndef _BINLOG_DRIVER_H
#define _BINLOG_DRIVER_H
#include "binlog_event.h"
#include "protocol.h"
#include "gtid.h"
namespace mysql {
namespace system {
class Binary_log_driver
{
public:
template <class FilenameT>
Binary_log_driver(const FilenameT& filename = FilenameT(), unsigned int offset = 0)
: m_binlog_file_name(filename), m_binlog_offset(offset), m_server_type(MYSQL_SERVER_TYPE_NA)
{
}
~Binary_log_driver() {}
/**
* Connect to the binary log using previously declared connection parameters
* @return Success or error code
* @retval 0 Success
* @retval >0 Error code (to be specified)
*/
virtual int connect(Gtid gtid)= 0;
virtual int connect() = 0;
virtual int connect(const boost::uint64_t binlog_pos) = 0;
/**
* Blocking attempt to get the next binlog event from the stream
* @param event [out] Pointer to a binary log event to be fetched.
*/
virtual int wait_for_next_event(mysql::Binary_log_event **event)= 0;
/**
* Set the reader position
* @param str The file name
* @param position The file position
*
* @return False on success and True if an error occurred.
*/
virtual int set_position(const std::string &str, unsigned long position)= 0;
virtual int set_position_gtid(const Gtid gtid) = 0;
/**
* Get the read position.
*
* @param[out] string_ptr Pointer to location where the filename will be stored.
* @param[out] position_ptr Pointer to location where the position will be stored.
*
* @retval 0 Success
* @retval >0 Error code
*/
virtual int get_position(std::string *filename_ptr, unsigned long *position_ptr) = 0;
virtual int fetch_server_version(const std::string& user,
const std::string& passwd,
const std::string& host,
long port) = 0;
virtual void shutdown() = 0;
Binary_log_event* parse_event(std::istream &sbuff, Log_event_header *header);
mysql_server_types get_mysql_server_type() const
{
return m_server_type;
}
protected:
/**
* Used each time the client reconnects to the server to specify an
* offset position.
*/
unsigned long m_binlog_offset;
std::string m_binlog_file_name;
mysql_server_types m_server_type;
};
} // namespace mysql::system
} // namespace mysql
#endif /* _BINLOG_DRIVER_H */

View File

@ -1,95 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
Copyright (c) 2013, MariaDB Corporation Ab
Portions of this file contain modifications contributed and copyrighted by
MariaDB Corporation, Ab. Those modifications are gratefully acknowledged and are described
briefly in the source code.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
/*
MariaDB Corporation change details:
- Added support for GTID event handling for both MySQL and MariaDB
Author: Jan Lindström (jan.lindstrom@mariadb.com
*/
#include "binlog_event.h"
#include <iostream>
namespace mysql
{
namespace system {
const char *get_event_type_str(Log_event_type type)
{
switch(type) {
case START_EVENT_V3: return "Start_v3";
case STOP_EVENT: return "Stop";
case QUERY_EVENT: return "Query";
case ROTATE_EVENT: return "Rotate";
case INTVAR_EVENT: return "Intvar";
case LOAD_EVENT: return "Load";
case NEW_LOAD_EVENT: return "New_load";
case SLAVE_EVENT: return "Slave";
case CREATE_FILE_EVENT: return "Create_file";
case APPEND_BLOCK_EVENT: return "Append_block";
case DELETE_FILE_EVENT: return "Delete_file";
case EXEC_LOAD_EVENT: return "Exec_load";
case RAND_EVENT: return "RAND";
case XID_EVENT: return "Xid";
case USER_VAR_EVENT: return "User var";
case FORMAT_DESCRIPTION_EVENT: return "Format_desc";
case TABLE_MAP_EVENT: return "Table_map";
case PRE_GA_WRITE_ROWS_EVENT: return "Write_rows_event_old";
case PRE_GA_UPDATE_ROWS_EVENT: return "Update_rows_event_old";
case PRE_GA_DELETE_ROWS_EVENT: return "Delete_rows_event_old";
case WRITE_ROWS_EVENT: return "Write_rows";
case UPDATE_ROWS_EVENT: return "Update_rows";
case DELETE_ROWS_EVENT: return "Delete_rows";
case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
case INCIDENT_EVENT: return "Incident";
case USER_DEFINED: return "User defined";
case GTID_EVENT_MYSQL: return "GTID MYSQL";
case GTID_EVENT_MARIADB: return "GTID MARIADB";
default: return "Unknown";
}
}
} // end namespace system
Binary_log_event::~Binary_log_event()
{
}
Binary_log_event * create_incident_event(unsigned int type, const char *message, unsigned long pos)
{
Incident_event *incident= new Incident_event();
incident->header()->type_code= INCIDENT_EVENT;
incident->header()->next_position= pos;
incident->header()->event_length= LOG_EVENT_HEADER_SIZE + 2 + strlen(message);
incident->type= type;
incident->message.append(message);
return incident;
}
} // end namespace mysql

View File

@ -1,299 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
Copyright (c) 2013-2014, MariaDB Corporation Ab
Portions of this file contain modifications contributed and copyrighted by
MariaDB Corporation, Ab. Those modifications are gratefully acknowledged and are described
briefly in the source code.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
/*
MariaDB Corporation change details:
- Added support for GTID event handling for both MySQL and MariaDB
Author: Jan Lindström (jan.lindstrom@mariadb.com
*/
#ifndef _BINLOG_EVENT_H
#define _BINLOG_EVENT_H
#include <boost/cstdint.hpp>
#include <list>
#include <boost/asio.hpp>
#include <boost/function.hpp>
#include <vector>
#include "gtid.h"
namespace mysql
{
/**
@enum Log_event_type
Enumeration type for the different types of log events.
*/
enum Log_event_type
{
/*
Every time you update this enum (when you add a type), you have to
fix Format_description_log_event::Format_description_log_event().
*/
UNKNOWN_EVENT= 0,
START_EVENT_V3= 1,
QUERY_EVENT= 2,
STOP_EVENT= 3,
ROTATE_EVENT= 4,
INTVAR_EVENT= 5,
LOAD_EVENT= 6,
SLAVE_EVENT= 7,
CREATE_FILE_EVENT= 8,
APPEND_BLOCK_EVENT= 9,
EXEC_LOAD_EVENT= 10,
DELETE_FILE_EVENT= 11,
/*
NEW_LOAD_EVENT is like LOAD_EVENT except that it has a longer
sql_ex, allowing multibyte TERMINATED BY etc; both types share the
same class (Load_log_event)
*/
NEW_LOAD_EVENT= 12,
RAND_EVENT= 13,
USER_VAR_EVENT= 14,
FORMAT_DESCRIPTION_EVENT= 15,
XID_EVENT= 16,
BEGIN_LOAD_QUERY_EVENT= 17,
EXECUTE_LOAD_QUERY_EVENT= 18,
TABLE_MAP_EVENT = 19,
/*
These event numbers were used for 5.1.0 to 5.1.15 and are
therefore obsolete.
*/
PRE_GA_WRITE_ROWS_EVENT = 20,
PRE_GA_UPDATE_ROWS_EVENT = 21,
PRE_GA_DELETE_ROWS_EVENT = 22,
/*
These event numbers are used from 5.1.16 and forward
*/
WRITE_ROWS_EVENT = 23,
UPDATE_ROWS_EVENT = 24,
DELETE_ROWS_EVENT = 25,
/*
Something out of the ordinary happened on the master
*/
INCIDENT_EVENT= 26,
/*
* A user defined event
*/
USER_DEFINED= 27,
/* We have two different implementations of global transaction id */
GTID_EVENT_MYSQL=33,
GTID_EVENT_MARIADB= 162,
/*
Add new events here - right above this comment!
Existing events (except ENUM_END_EVENT) should never change their numbers
*/
ENUM_END_EVENT /* end marker */
};
namespace system {
/**
* Convenience function to get the string representation of a binlog event.
*/
const char* get_event_type_str(Log_event_type type);
} // end namespace system
#define LOG_EVENT_HEADER_SIZE 20
class Log_event_header
{
public:
boost::uint8_t marker; // always 0 or 0xFF
boost::uint32_t timestamp;
boost::uint8_t type_code;
boost::uint32_t server_id;
boost::uint32_t event_length;
boost::uint32_t next_position;
boost::uint16_t flags;
};
class Binary_log_event;
/**
* TODO Base class for events. Implementation is in body()
*/
class Binary_log_event
{
public:
Binary_log_event()
{
/*
An event length of 0 indicates that the header isn't initialized
*/
m_header.event_length= 0;
m_header.type_code= 0;
}
Binary_log_event(Log_event_header *header)
{
m_header= *header;
}
virtual ~Binary_log_event();
/**
* Helper method
*/
enum Log_event_type get_event_type() const
{
return (enum Log_event_type) m_header.type_code;
}
/**
* Return a pointer to the header of the log event
*/
Log_event_header *header() { return &m_header; }
private:
Log_event_header m_header;
};
class Query_event: public Binary_log_event
{
public:
Query_event(Log_event_header *header) : Binary_log_event(header) {}
boost::uint32_t thread_id;
boost::uint32_t exec_time;
boost::uint16_t error_code;
std::vector<boost::uint8_t > variables;
std::string db_name;
std::string query;
};
class Gtid_event: public Binary_log_event
{
public:
Gtid_event(Log_event_header *header) : Binary_log_event(header) {}
size_t gtid_length() { return MYSQL_GTID_ENCODED_SIZE;}
boost::uint32_t domain_id;
boost::uint32_t server_id;
boost::uint64_t sequence_number;
unsigned char m_mysql_gtid[MYSQL_GTID_ENCODED_SIZE];
Gtid m_gtid;
};
class Rotate_event: public Binary_log_event
{
public:
Rotate_event(Log_event_header *header) : Binary_log_event(header) {}
std::string binlog_file;
boost::uint64_t binlog_pos;
};
class Format_event: public Binary_log_event
{
public:
Format_event(Log_event_header *header) : Binary_log_event(header) {}
boost::uint16_t binlog_version;
std::string master_version;
boost::uint32_t created_ts;
boost::uint8_t log_header_len;
};
class User_var_event: public Binary_log_event
{
public:
enum Value_type {
STRING_TYPE,
REAL_TYPE,
INT_TYPE,
ROW_TYPE,
DECIMAL_TYPE,
VALUE_TYPE_COUNT
};
User_var_event(Log_event_header *header) : Binary_log_event(header) {}
std::string name;
boost::uint8_t is_null;
boost::uint8_t type;
boost::uint32_t charset; /* charset of the string */
std::string value; /* encoded in binary speak, depends on .type */
};
class Table_map_event: public Binary_log_event
{
public:
Table_map_event(Log_event_header *header) : Binary_log_event(header) {}
boost::uint64_t table_id;
boost::uint16_t flags;
std::string db_name;
std::string table_name;
std::vector<uint8_t> columns;
std::vector<uint8_t> metadata;
std::vector<uint8_t> null_bits;
};
class Row_event: public Binary_log_event
{
public:
Row_event(Log_event_header *header) : Binary_log_event(header) {}
boost::uint64_t table_id;
boost::uint16_t flags;
boost::uint64_t columns_len;
boost::uint32_t null_bits_len;
std::vector<boost::uint8_t> columns_before_image;
std::vector<uint8_t> used_columns;
std::vector<uint8_t> row;
};
class Int_var_event: public Binary_log_event
{
public:
Int_var_event(Log_event_header *header) : Binary_log_event(header) {}
boost::uint8_t type;
boost::uint64_t value;
};
class Incident_event: public Binary_log_event
{
public:
Incident_event() : Binary_log_event() {}
Incident_event(Log_event_header *header) : Binary_log_event(header) {}
boost::uint8_t type;
std::string message;
};
class Xid: public Binary_log_event
{
public:
Xid(Log_event_header *header) : Binary_log_event(header) {}
boost::uint64_t xid_id;
};
Binary_log_event *create_incident_event(unsigned int type, const char *message, unsigned long pos= 0);
} // end namespace mysql
#endif /* _BINLOG_EVENT_H */

View File

@ -1,91 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
#ifndef _BOUNDED_BUFFER_H
#define _BOUNDED_BUFFER_H
#include <boost/circular_buffer.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/thread/thread.hpp>
#include <boost/progress.hpp>
#include <boost/bind.hpp>
template <class T>
class bounded_buffer
{
public:
typedef boost::circular_buffer<T> container_type;
typedef typename container_type::size_type size_type;
typedef typename container_type::value_type value_type;
explicit bounded_buffer(size_type capacity) : m_unread(0), m_container(capacity) {}
void push_front(const value_type& item)
{
boost::mutex::scoped_lock lock(m_mutex);
m_not_full.wait(lock, boost::bind(&bounded_buffer<value_type>::is_not_full, this));
m_container.push_front(item);
++m_unread;
lock.unlock();
m_not_empty.notify_one();
}
void pop_back(value_type* pItem)
{
boost::mutex::scoped_lock lock(m_mutex);
m_not_empty.wait(lock, boost::bind(&bounded_buffer<value_type>::is_not_empty, this));
*pItem = m_container[--m_unread];
lock.unlock();
m_not_full.notify_one();
}
bool has_unread()
{
boost::mutex::scoped_lock lock(m_mutex);
return is_not_empty();
}
void lock()
{
m_mutex.lock();
}
void unlock()
{
m_mutex.unlock();
}
private:
bounded_buffer(const bounded_buffer&); // Disabled copy constructor
bounded_buffer& operator = (const bounded_buffer&); // Disabled assign operator
bool is_not_empty() const { return m_unread > 0; }
bool is_not_full() const { return m_unread < m_container.capacity(); }
size_type m_unread;
container_type m_container;
boost::mutex m_mutex;
boost::condition m_not_empty;
boost::condition m_not_full;
};
#endif /* _BOUNDED_BUFFER_H */

View File

@ -1,197 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
#ifndef _FIELD_ITERATOR_H
#define _FIELD_ITERATOR_H
#include "binlog_event.h"
#include "value.h"
#include "row_of_fields.h"
#include <vector>
#include <mysql.h>
using namespace mysql;
namespace mysql {
bool is_null(unsigned char *bitmap, int index);
int lookup_metadata_field_size(enum enum_field_types field_type);
boost::uint32_t extract_metadata(const Table_map_event *map, int col_no);
template <class Iterator_value_type >
class Row_event_iterator : public std::iterator<std::forward_iterator_tag,
Iterator_value_type>
{
public:
Row_event_iterator() : m_row_event(0), m_table_map(0),
m_new_field_offset_calculated(0), m_field_offset(0)
{ }
Row_event_iterator(const Row_event *row_event,
const Table_map_event *table_map)
: m_row_event(row_event), m_table_map(table_map),
m_new_field_offset_calculated(0)
{
m_field_offset= 0;
}
Iterator_value_type operator*();
Row_event_iterator& operator++();
Row_event_iterator operator++(int);
bool operator==(const Row_event_iterator& x) const;
bool operator!=(const Row_event_iterator& x) const;
//Row_iterator end() const;
private:
size_t fields(Iterator_value_type& fields_vector );
const Row_event *m_row_event;
const Table_map_event *m_table_map;
unsigned long m_new_field_offset_calculated;
unsigned long m_field_offset;
};
template <class Iterator_value_type>
size_t Row_event_iterator<Iterator_value_type>::fields(Iterator_value_type& fields_vector )
{
size_t field_offset= m_field_offset;
int row_field_col_index= 0;
std::vector<boost::uint8_t> nullbits(m_row_event->null_bits_len);
std::copy(m_row_event->row.begin()+m_field_offset,
m_row_event->row.begin()+(m_field_offset+m_row_event->null_bits_len),
nullbits.begin());
field_offset += m_row_event->null_bits_len;
for(unsigned col_no=0; col_no < m_table_map->columns.size(); ++col_no)
{
++row_field_col_index;
unsigned int type= m_table_map->columns[col_no]&0xFF;
boost::uint32_t metadata= extract_metadata(m_table_map, col_no);
mysql::Value val((enum enum_field_types)type,
metadata,
(const char *)&m_row_event->row[field_offset]);
if (is_null((unsigned char *)&nullbits[0], col_no ))
{
val.is_null(true);
}
else
{
/*
If the value is null it is not in the list of values and thus we won't
increse the offset. TODO what if all values are null?!
*/
field_offset += val.length();
}
fields_vector.push_back(val);
}
return field_offset;
}
template <class Iterator_value_type >
Iterator_value_type Row_event_iterator<Iterator_value_type>::operator*()
{ // dereferencing
Iterator_value_type fields_vector;
/*
* Remember this offset if we need to increate the row pointer
*/
m_new_field_offset_calculated= fields(fields_vector);
return fields_vector;
}
template< class Iterator_value_type >
Row_event_iterator< Iterator_value_type >&
Row_event_iterator< Iterator_value_type >::operator++()
{ // prefix
if (m_field_offset < m_row_event->row.size())
{
/*
* If we requested the fields in a previous operations
* we also calculated the new offset at the same time.
*/
if (m_new_field_offset_calculated != 0)
{
m_field_offset= m_new_field_offset_calculated;
//m_field_offset += m_row_event->null_bits_len;
m_new_field_offset_calculated= 0;
if (m_field_offset >= m_row_event->row.size())
m_field_offset= 0;
return *this;
}
/*
* Advance the field offset to the next row
*/
int row_field_col_index= 0;
std::vector<uint8_t> nullbits(m_row_event->null_bits_len);
std::copy(m_row_event->row.begin()+m_field_offset,
m_row_event->row.begin()+(m_field_offset+m_row_event->null_bits_len),
nullbits.begin());
m_field_offset += m_row_event->null_bits_len;
for(unsigned col_no=0; col_no < m_table_map->columns.size(); ++col_no)
{
++row_field_col_index;
mysql::Value val((enum enum_field_types)m_table_map->columns[col_no],
m_table_map->metadata[col_no],
(const char *)&m_row_event->row[m_field_offset]);
if (!is_null((unsigned char *)&nullbits[0], col_no))
{
m_field_offset += val.length();
}
}
return *this;
}
m_field_offset= 0;
return *this;
}
template <class Iterator_value_type >
Row_event_iterator< Iterator_value_type >
Row_event_iterator< Iterator_value_type >::operator++(int)
{ // postfix
Row_event_iterator temp = *this;
++*this;
return temp;
}
template <class Iterator_value_type >
bool Row_event_iterator< Iterator_value_type >::operator==(const Row_event_iterator& x) const
{
return m_field_offset == x.m_field_offset;
}
template <class Iterator_value_type >
bool Row_event_iterator< Iterator_value_type >::operator!=(const Row_event_iterator& x) const
{
return m_field_offset != x.m_field_offset;
}
}
#endif /* _FIELD_ITERATOR_H */

View File

@ -1,118 +0,0 @@
/*
Copyright (C) 2013, MariaDB Corporation Ab
This file is distributed as part of the MariaDB Corporation MaxScale. It is free
software: you can redistribute it and/or modify it under the terms of the
GNU General Public License as published by the Free Software Foundation,
version 2.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc., 51
Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
Author: Jan Lindström jan.lindstrom@mariadb.com
*/
#include <boost/asio.hpp>
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include "gtid.h"
#include "listener_exception.h"
#include <mysql.h>
#include <my_global.h>
#include <my_byteorder.h>
namespace mysql
{
Gtid::Gtid(const boost::uint32_t domain_id,
const boost::uint32_t server_id,
const boost::uint64_t sequence_number)
: m_real_gtid(true),
m_domain_id(domain_id),
m_server_id(server_id),
m_sequence_number(sequence_number),
m_server_type(MYSQL_SERVER_TYPE_MARIADB)
{
memset(m_mysql_gtid, 0, MYSQL_GTID_ENCODED_SIZE);
m_mariadb_gtid = to_string(m_domain_id) + std::string("-") + to_string(m_server_id) + std::string("-") + to_string(m_sequence_number);
m_gtid_length = m_mariadb_gtid.length();
}
Gtid::Gtid(const unsigned char *mysql_gtid,
const boost::uint64_t gno)
:m_real_gtid(true),
m_domain_id(0),
m_server_id(0),
m_sequence_number(gno),
m_server_type(MYSQL_SERVER_TYPE_MYSQL),
m_gtid_length(MYSQL_GTID_ENCODED_SIZE)
{
memcpy(m_mysql_gtid, mysql_gtid, MYSQL_GTID_ENCODED_SIZE);
}
Gtid::Gtid(const unsigned char* mysql_gtid)
:m_real_gtid(true),
m_domain_id(0),
m_server_id(0),
m_sequence_number(0),
m_server_type(MYSQL_SERVER_TYPE_MYSQL),
m_gtid_length(MYSQL_GTID_ENCODED_SIZE)
{
int i,k;
char tmp[2];
char *sid = (char *)mysql_gtid;
for(i=0,k=0; i < 16*2; i+=2,k++) {
unsigned int c;
tmp[0] = sid[i];
tmp[1] = sid[i+1];
sscanf((const char *)tmp, "%02x", &c);
m_mysql_gtid[k]=(unsigned char)c;
}
i++;
k++;
sscanf((const char *)&(sid[i]), "%lu", &m_sequence_number);
int8store(&(m_mysql_gtid[k]), m_sequence_number);
std::cout << "GTID:: " << m_mysql_gtid << " " << std::endl;
}
std::string Gtid::get_string() const
{
if (m_server_type == MYSQL_SERVER_TYPE_MARIADB) {
return (m_mariadb_gtid);
} else {
std::string hexs;
unsigned char *sid = (unsigned char *)m_mysql_gtid;
char tmp[2];
// Dump the encoded SID using hexadesimal representation
// Making it little bit more usefull
for(size_t i=0;i < 16;i++) {
sprintf((char *)tmp, "%02x", (unsigned char)sid[i]);
hexs.append(std::string((const char *)tmp));
}
return(hexs + std::string(":") + to_string(m_sequence_number));
}
}
const unsigned char* Gtid::get_gtid() const
{
if (m_server_type == MYSQL_SERVER_TYPE_MARIADB) {
return ((const unsigned char *)m_mariadb_gtid.c_str());
} else {
return (m_mysql_gtid);
}
}
}

View File

@ -1,101 +0,0 @@
/*
Copyright (C) 2013-2014, MariaDB Corporation Ab
This file is distributed as part of the MariaDB Corporation MaxScale. It is free
software: you can redistribute it and/or modify it under the terms of the
GNU General Public License as published by the Free Software Foundation,
version 2.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc., 51
Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
Author: Jan Lindström jan.lindstrom@mariadb.com
*/
#ifndef REPLICATION_LISTENER_MYSQL_GTID_H
#define REPLICATION_LISTENER_MYSQL_GTID_H
#include <boost/asio.hpp>
#include <boost/function.hpp>
#include <boost/bind.hpp>
namespace mysql
{
template <class T>
inline std::string gno_to_string (const T& t)
{
std::stringstream ss;
ss << t;
return ss.str();
}
enum mysql_server_types {
MYSQL_SERVER_TYPE_NA = 0,
MYSQL_SERVER_TYPE_MARIADB = 1,
MYSQL_SERVER_TYPE_MYSQL = 2
};
#define MYSQL_GTID_ENCODED_SIZE 24
class Gtid
{
public:
Gtid()
: m_real_gtid(false), m_domain_id(0), m_server_id(0), m_sequence_number(0),
m_server_type(MYSQL_SERVER_TYPE_NA), m_gtid_length(0), m_mariadb_gtid(std::string(""))
{
memset(m_mysql_gtid, 0, MYSQL_GTID_ENCODED_SIZE);
}
Gtid(const boost::uint32_t domain_id,
const boost::uint32_t server_id,
const boost::uint64_t sequence_number);
Gtid(const unsigned char *mysql_gtid,
const boost::uint64_t gno);
Gtid(const unsigned char *mysql_gtid);
~Gtid() {}
bool is_real_gtid() const { return m_real_gtid;}
const unsigned char* get_mysql_gtid() const { return m_mysql_gtid; }
const unsigned char* get_gtid() const;
size_t get_gtid_length() const { return m_gtid_length; }
std::string get_string() const;
boost::uint32_t get_domain_id() const { return m_domain_id; }
boost::uint32_t get_server_id() const { return m_server_id; }
boost::uint32_t get_sequence_number() const { return m_sequence_number; }
private:
bool m_real_gtid;
mysql_server_types m_server_type;
boost::uint32_t m_domain_id;
boost::uint32_t m_server_id;
boost::uint64_t m_sequence_number;
boost::uint32_t m_gtid_length;
unsigned char m_mysql_gtid[MYSQL_GTID_ENCODED_SIZE];
std::string m_mariadb_gtid;
};
}
#endif

View File

@ -1,4 +0,0 @@
/usr/local/lib/libreplication.so.0.1
/usr/local/lib/libreplication.so.1
/usr/local/lib/libreplication.so
/usr/local/lib/libreplication.a

View File

@ -1,58 +0,0 @@
/*
Copyright (C) 2013-2014, MariaDB Corporation Ab
This file is distributed as part of the MariaDB Corporation MaxScale. It is free
software: you can redistribute it and/or modify it under the terms of the
GNU General Public License as published by the Free Software Foundation,
version 2.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc., 51
Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
Author: Jan Lindström jan.lindstrom@mariadb.com
*/
#ifndef REPLICATION_LISTENER_MYSQL_ERROR_EXCEPTION
#define REPLICATION_LISTENER_MYSQL_ERROR_EXCEPTION
namespace mysql
{
// Derive from std::runtime_error rather than std::exception
// runtime_error's constructor can take a string as parameter
// the standard's compliant version of std::exception can not
// (though some compiler provide a non standard constructor).
//
#include <sstream>
#include <boost/system/system_error.hpp>
// Helper function
template <class T>
inline std::string to_string (const T& t)
{
std::stringstream ss;
ss << t;
return ss.str();
}
class ListenerException : public std::runtime_error
{
public:
ListenerException(std::string message, const char *file, int line)
: std::runtime_error(std::string("Exception: ") + message + std::string(" file: ") + std::string(file) + std::string(" line: ") + (to_string(line))) {}
};
}
#endif

View File

@ -1,629 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
Copyright (c) 2013, MariaDB Corporation Ab
Portions of this file contain modifications contributed and copyrighted by
MariaDB Corporation, Ab. Those modifications are gratefully acknowledged and are described
briefly in the source code.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
/*
MariaDB Corporation change details:
- Added support for GTID event handling for both MySQL and MariaDB
Author: Jan Lindström (jan.lindstrom@mariadb.com
*/
#include <stdint.h>
#include <boost/array.hpp>
#include <vector>
#include "protocol.h"
#include "listener_exception.h"
#include <iostream>
#include <mysql.h>
#include <my_global.h>
#include <mysql_com.h>
using namespace mysql;
using namespace mysql::system;
namespace mysql { namespace system {
int proto_read_package_header(tcp::socket *socket, unsigned long *packet_length, unsigned char *packet_no)
{
unsigned char buf[4];
try {
boost::asio::read(*socket, boost::asio::buffer(buf, 4),
boost::asio::transfer_at_least(4));
}
catch (boost::system::system_error const& e)
{
throw(ListenerException("Read package header error: " + std::string(e.what()), __FILE__, __LINE__));
}
catch (boost::system::error_code const& e)
{
throw(ListenerException("Read package header error: " + e.message(), __FILE__, __LINE__));
}
catch (std::exception& e)
{
throw(ListenerException("Read package header error: " + std::string(e.what()), __FILE__, __LINE__));
}
*packet_length= (unsigned long)(buf[0] &0xFF);
*packet_length+= (unsigned long)((buf[1] &0xFF)<<8);
*packet_length+= (unsigned long)((buf[2] &0xFF)<<16);
*packet_no= (unsigned char)buf[3];
return 0;
}
int proto_read_package_header(tcp::socket *socket, boost::asio::streambuf &buff, unsigned long *packet_length, unsigned char *packet_no)
{
std::streamsize inbuff= buff.in_avail();
if( inbuff < 0)
inbuff= 0;
if (4 > inbuff)
{
try {
boost::asio::read(*socket, buff,
boost::asio::transfer_at_least(4-inbuff));
}
catch (boost::system::system_error const& e)
{
throw(ListenerException("Read package header error: " + std::string(e.what()), __FILE__, __LINE__));
}
catch (boost::system::error_code const& e)
{
throw(ListenerException("Read package header error: " + e.message(), __FILE__, __LINE__));
}
catch (std::exception& e)
{
throw(ListenerException("Read package header error: " + std::string(e.what()), __FILE__, __LINE__));
}
}
char ch;
std::istream is(&buff);
is.get(ch);
*packet_length= (unsigned long)ch;
is.get(ch);
*packet_length+= (unsigned long)(ch<<8);
is.get(ch);
*packet_length+= (unsigned long)(ch<<16);
is.get(ch);
*packet_no= (unsigned char)ch;
return 0;
}
int proto_get_one_package(tcp::socket *socket, boost::asio::streambuf &buff,
boost::uint8_t *packet_no)
{
unsigned long packet_length;
if (proto_read_package_header(socket, buff, &packet_length, packet_no))
return 0;
std::streamsize inbuffer= buff.in_avail();
if (inbuffer < 0)
inbuffer= 0;
if (packet_length > inbuffer)
boost::asio::read(*socket, buff,
boost::asio::transfer_at_least(packet_length-inbuffer));
return packet_length;
}
void prot_parse_error_message(std::istream &is, struct st_error_package &err,
int packet_length)
{
boost::uint8_t marker;
Protocol_chunk<boost::uint16_t> prot_errno(err.error_code);
Protocol_chunk<boost::uint8_t> prot_marker(marker);
Protocol_chunk<boost::uint8_t> prot_sql_state(err.sql_state,5);
is >> prot_errno
>> prot_marker
>> prot_sql_state;
// TODO is the number of bytes read = is.tellg() ?
int message_size= packet_length -2 -1 -5; // the remaining part of the package
Protocol_chunk_string prot_message(err.message, message_size);
is >> prot_message;
err.message[message_size]= '\0';
}
void prot_parse_ok_message(std::istream &is, struct st_ok_package &ok, int packet_length)
{
// TODO: Assure that zero length messages can be but on the input stream.
//Protocol_chunk<boost::uint8_t> prot_result_type(result_type);
Protocol_chunk<boost::uint64_t> prot_affected_rows(ok.affected_rows);
Protocol_chunk<boost::uint64_t> prot_insert_id(ok.insert_id);
Protocol_chunk<boost::uint16_t> prot_server_status(ok.server_status);
Protocol_chunk<boost::uint16_t> prot_warning_count(ok.warning_count);
int message_size= packet_length -2 -prot_affected_rows.size()
-prot_insert_id.size() -prot_server_status.size()
-prot_warning_count.size();
prot_affected_rows.set_length_encoded_binary(true);
prot_insert_id.set_length_encoded_binary(true);
is >> prot_affected_rows
>> prot_insert_id
>> prot_server_status
>> prot_warning_count;
if (message_size > 0)
{
Protocol_chunk_string prot_message(ok.message, message_size);
is >> prot_message;
ok.message[message_size]= '\0';
}
}
void prot_parse_eof_message(std::istream &is, struct st_eof_package &eof)
{
Protocol_chunk<boost::uint16_t> proto_warning_count(eof.warning_count);
Protocol_chunk<boost::uint16_t> proto_status_flags(eof.status_flags);
is >> proto_warning_count
>> proto_status_flags;
}
void proto_get_handshake_package(std::istream &is,
struct st_handshake_package &p,
int packet_length)
{
boost::uint8_t filler;
boost::uint8_t filler2[13];
Protocol_chunk<boost::uint8_t> proto_protocol_version(p.protocol_version);
Protocol_chunk<boost::uint32_t> proto_thread_id(p.thread_id);
Protocol_chunk<boost::uint8_t> proto_scramble_buffer(p.scramble_buff, 8);
Protocol_chunk<boost::uint8_t> proto_filler(filler);
Protocol_chunk<boost::uint16_t> proto_server_capabilities(p.server_capabilities);
Protocol_chunk<boost::uint8_t> proto_server_language(p.server_language);
Protocol_chunk<boost::uint16_t> proto_server_status(p.server_status);
Protocol_chunk<boost::uint8_t> proto_filler2(filler2,13);
Protocol_chunk<boost::uint8_t> proto_scramble_buffer2(p.scramble_buff2, 13);
is >> proto_protocol_version
>> p.server_version_str
>> proto_thread_id
>> proto_scramble_buffer
>> proto_filler
>> proto_server_capabilities
>> proto_server_language
>> proto_server_status
>> proto_filler2
>> proto_scramble_buffer2;
//assert(filler == 0);
int remaining_bytes= packet_length - 9+13+13+8;
boost::uint8_t extention_buffer[remaining_bytes];
if (remaining_bytes > 0)
{
Protocol_chunk<boost::uint8_t> proto_extension(extention_buffer, remaining_bytes);
is >> proto_extension;
}
//std::copy(&extention_buffer[0],&extention_buffer[remaining_bytes],std::ostream_iterator<char>(std::cout,","));
}
void write_packet_header(char *buff, boost::uint16_t size, boost::uint8_t packet_no)
{
int3store(buff, size);
buff[3]= (char)packet_no;
}
buffer_source &operator>>(buffer_source &src, Protocol &chunk)
{
char ch;
int ct= 0;
char *ptr= (char*)chunk.data();
while(ct < chunk.size() && src.m_ptr < src.m_size)
{
ptr[ct]= src.m_src[src.m_ptr];
++ct;
++src.m_ptr;
}
return src;
}
std::istream &operator>>(std::istream &is, Protocol &chunk)
{
if (chunk.is_length_encoded_binary())
{
int ct= 0;
is.read((char *)chunk.data(),1);
unsigned char byte= *(unsigned char *)chunk.data();
if (byte < 250)
{
chunk.collapse_size(1);
return is;
}
else if (byte == 251)
{
// is this a row data packet? if so, then this column value is NULL
chunk.collapse_size(1);
ct= 1;
}
else if (byte == 252)
{
chunk.collapse_size(2);
ct= 1;
}
else if(byte == 253)
{
chunk.collapse_size(3);
ct= 1;
}
/* Read remaining bytes */
//is.read((char *)chunk.data(), chunk.size()-1);
char ch;
char *ptr= (char*)chunk.data();
while(ct < chunk.size())
{
is.get(ch);
ptr[ct]= ch;
++ct;
}
}
else
{
char ch;
int ct= 0;
char *ptr= (char*)chunk.data();
int sz= chunk.size();
while(ct < sz)
{
is.get(ch);
ptr[ct]= ch;
++ct;
}
}
return is;
}
std::istream &operator>>(std::istream &is, std::string &str)
{
std::ostringstream out;
char ch;
int ct= 0;
do
{
is.get(ch);
out.put(ch);
++ct;
} while (is.good() && ch != '\0');
str.append(out.str());
return is;
}
std::istream &operator>>(std::istream &is, Protocol_chunk_string &str)
{
char ch;
int ct= 0;
int sz= str.m_str->size();
for (ct=0; ct< sz && is.good(); ct++)
{
is.get(ch);
str.m_str->at(ct)= ch;
}
return is;
}
std::istream &operator>>(std::istream &is, Protocol_chunk_string_len &lenstr)
{
boost::uint8_t len;
std::string *str= lenstr.m_storage;
Protocol_chunk<boost::uint8_t> proto_str_len(len);
is >> proto_str_len;
Protocol_chunk_string proto_str(*str, len);
is >> proto_str;
return is;
}
std::ostream &operator<<(std::ostream &os, Protocol &chunk)
{
if (!os.bad())
os.write((const char *) chunk.data(),chunk.size());
return os;
}
Query_event *proto_query_event(std::istream &is, Log_event_header *header)
{
boost::uint8_t db_name_len;
boost::uint16_t var_size;
// Length of query stored in the payload.
boost::uint32_t query_len;
Query_event *qev=new Query_event(header);
Protocol_chunk<boost::uint32_t> proto_query_event_thread_id(qev->thread_id);
Protocol_chunk<boost::uint32_t> proto_query_event_exec_time(qev->exec_time);
Protocol_chunk<boost::uint8_t> proto_query_event_db_name_len(db_name_len);
Protocol_chunk<boost::uint16_t> proto_query_event_error_code(qev->error_code);
Protocol_chunk<boost::uint16_t> proto_query_event_var_size(var_size);
is >> proto_query_event_thread_id
>> proto_query_event_exec_time
>> proto_query_event_db_name_len
>> proto_query_event_error_code
>> proto_query_event_var_size;
//TODO : Implement it in a better way.
/*
Query length =
Total event length (header->event_length) -
(
(LOG_EVENT_HEADER_SIZE - 1) + //Shouldn't LOG_EVENT_HEADER_SIZE=19?
Thread-id (pre-defined, 4) +
Execution time (pre-defined, 4) +
Placeholder to store database length (pre-defined, 1) +
Error code (pre-defined, 2) +
Placeholder to store length taken by status variable blk (pre-defined, 2) +
Status variable block length (calculated, var_size) +
Database name length (calculated, db_name_len) +
Null terninator (pre-defined, 1) +
)
which gives :
*/
query_len= header->event_length - (LOG_EVENT_HEADER_SIZE + 13 + var_size +
db_name_len);
qev->variables.reserve(var_size);
Protocol_chunk_vector proto_payload(qev->variables, var_size);
is >> proto_payload;
Protocol_chunk_string proto_query_event_db_name(qev->db_name,
(unsigned long)db_name_len);
Protocol_chunk_string proto_query_event_query_str
(qev->query, (unsigned long)query_len);
char zero_marker; // should always be 0;
is >> proto_query_event_db_name
>> zero_marker
>> proto_query_event_query_str;
// Following is not really required now,
//qev->query.resize(qev->query.size() - 1); // Last character is a '\0' character.
return qev;
}
Gtid_event *proto_gtid_event(std::istream &is, Log_event_header *header)
{
Gtid_event *gev=new Gtid_event(header);
boost::uint32_t gtid_length=0;
if (header->type_code == GTID_EVENT_MARIADB) {
Protocol_chunk<boost::uint32_t> proto_gtid_event_domain_id(gev->domain_id);
gev->server_id = header->server_id;
Protocol_chunk<boost::uint64_t> proto_gtid_event_sequence_number(gev->sequence_number);
// In MariaDB GTIDs are just sequence number followed by domain id
is >> proto_gtid_event_sequence_number
>> proto_gtid_event_domain_id;
gev->m_gtid= Gtid(gev->domain_id, gev->server_id, gev->sequence_number);
} else {
// In MySQL GTIDs consists two parts SID and global sequence
// number. SID is stored in encoded format, we will not try to
// understand that. Global sequence number is more meaningfull.
unsigned char gtid_data[MYSQL_GTID_ENCODED_SIZE+1];
memset(gtid_data, 0, MYSQL_GTID_ENCODED_SIZE+1);
is.read((char *)gtid_data, MYSQL_GTID_ENCODED_SIZE);
unsigned char *buf = gtid_data;
buf++; // commit flag, ignore
memcpy(gev->m_mysql_gtid, (char *)buf, MYSQL_GTID_ENCODED_SIZE);
gev->sequence_number = uint8korr(buf+16);
gev->m_gtid= Gtid(gev->m_mysql_gtid, gev->sequence_number);
}
return gev;
}
Rotate_event *proto_rotate_event(std::istream &is, Log_event_header *header)
{
Rotate_event *rev= new Rotate_event(header);
boost::uint32_t file_name_length= header->event_length - 7 - LOG_EVENT_HEADER_SIZE;
Protocol_chunk<boost::uint64_t > prot_position(rev->binlog_pos);
Protocol_chunk_string prot_file_name(rev->binlog_file, file_name_length);
is >> prot_position
>> prot_file_name;
return rev;
}
Incident_event *proto_incident_event(std::istream &is, Log_event_header *header)
{
Incident_event *incident= new Incident_event(header);
Protocol_chunk<boost::uint8_t> proto_incident_code(incident->type);
Protocol_chunk_string_len proto_incident_message(incident->message);
is >> proto_incident_code
>> proto_incident_message;
return incident;
}
Row_event *proto_rows_event(std::istream &is, Log_event_header *header)
{
Row_event *rev=new Row_event(header);
union
{
boost::uint64_t integer;
boost::uint8_t bytes[6];
} table_id;
table_id.integer=0L;
Protocol_chunk<boost::uint8_t> proto_table_id(&table_id.bytes[0], 6);
Protocol_chunk<boost::uint16_t> proto_flags(rev->flags);
Protocol_chunk<boost::uint64_t> proto_column_len(rev->columns_len);
proto_column_len.set_length_encoded_binary(true);
is >> proto_table_id
>> proto_flags
>> proto_column_len;
rev->table_id=table_id.integer;
int used_column_len=(int) ((rev->columns_len + 7) / 8);
Protocol_chunk_vector proto_used_columns(rev->used_columns, used_column_len);
rev->null_bits_len= used_column_len;
is >> proto_used_columns;
if (header->type_code == UPDATE_ROWS_EVENT)
{
Protocol_chunk_vector proto_columns_before_image(rev->columns_before_image, used_column_len);
is >> proto_columns_before_image;
}
int bytes_read=proto_table_id.size() + proto_flags.size() + proto_column_len.size() + used_column_len;
if (header->type_code == UPDATE_ROWS_EVENT)
bytes_read+=used_column_len;
unsigned long row_len= header->event_length - bytes_read - LOG_EVENT_HEADER_SIZE + 1;
//std::cout << "Bytes read: " << bytes_read << " Bytes expected: " << rev->row_len << std::endl;
Protocol_chunk_vector proto_row(rev->row, row_len);
is >> proto_row;
return rev;
}
Int_var_event *proto_intvar_event(std::istream &is, Log_event_header *header)
{
Int_var_event *event= new Int_var_event(header);
Protocol_chunk<boost::uint8_t> proto_type(event->type);
Protocol_chunk<boost::uint64_t> proto_value(event->value);
is >> proto_type
>> proto_value;
return event;
}
User_var_event *proto_uservar_event(std::istream &is, Log_event_header *header)
{
User_var_event *event= new User_var_event(header);
boost::uint32_t name_len;
Protocol_chunk<boost::uint32_t> proto_name_len(name_len);
is >> proto_name_len;
Protocol_chunk_string proto_name(event->name, name_len);
Protocol_chunk<boost::uint8_t> proto_null(event->is_null);
is >> proto_name >> proto_null;
if (event->is_null)
{
event->type = User_var_event::STRING_TYPE;
event->charset = 63; // Binary charset
}
else
{
boost::uint32_t value_len;
Protocol_chunk<boost::uint8_t> proto_type(event->type);
Protocol_chunk<boost::uint32_t> proto_charset(event->charset);
Protocol_chunk<boost::uint32_t> proto_val_len(value_len);
is >> proto_type >> proto_charset >> proto_val_len;
Protocol_chunk_string proto_value(event->value, value_len);
is >> proto_value;
}
return event;
}
Table_map_event *proto_table_map_event(std::istream &is, Log_event_header *header)
{
Table_map_event *tmev=new Table_map_event(header);
boost::uint64_t columns_len= 0;
boost::uint64_t metadata_len= 0;
union
{
boost::uint64_t integer;
boost::uint8_t bytes[6];
} table_id;
char zero_marker= 0;
table_id.integer=0L;
Protocol_chunk<boost::uint8_t> proto_table_id(&table_id.bytes[0], 6);
Protocol_chunk<boost::uint16_t> proto_flags(tmev->flags);
Protocol_chunk_string_len proto_db_name(tmev->db_name);
Protocol_chunk<boost::uint8_t> proto_marker(zero_marker); // Should be '\0'
Protocol_chunk_string_len proto_table_name(tmev->table_name);
Protocol_chunk<boost::uint64_t> proto_columns_len(columns_len);
proto_columns_len.set_length_encoded_binary(true);
is >> proto_table_id
>> proto_flags
>> proto_db_name
>> proto_marker
>> proto_table_name
>> proto_marker
>> proto_columns_len;
tmev->table_id=table_id.integer;
Protocol_chunk_vector proto_columns(tmev->columns, columns_len);
Protocol_chunk<boost::uint64_t> proto_metadata_len(metadata_len);
proto_metadata_len.set_length_encoded_binary(true);
is >> proto_columns
>> proto_metadata_len;
Protocol_chunk_vector proto_metadata(tmev->metadata, (unsigned long)metadata_len);
is >> proto_metadata;
unsigned long null_bits_len=(int) ((tmev->columns.size() + 7) / 8);
Protocol_chunk_vector proto_null_bits(tmev->null_bits, null_bits_len);
is >> proto_null_bits;
return tmev;
}
std::istream &operator>>(std::istream &is, Protocol_chunk_vector &chunk)
{
unsigned long size= chunk.m_size;
for(int i=0; i< size; i++)
{
char ch;
is.get(ch);
chunk.m_vec->push_back(ch);
}
return is;
}
} } // end namespace mysql::system

View File

@ -1,310 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
#ifndef _PROTOCOL_H
#define _PROTOCOL_H
#include <boost/asio.hpp>
#include <list>
#include "binlog_event.h"
using boost::asio::ip::tcp;
namespace mysql {
namespace system {
/**
Storage structure for the handshake package sent from the server to
the client.
*/
struct st_handshake_package
{
boost::uint8_t protocol_version;
std::string server_version_str;
boost::uint32_t thread_id;
boost::uint8_t scramble_buff[8];
boost::uint16_t server_capabilities;
boost::uint8_t server_language;
boost::uint16_t server_status;
boost::uint8_t scramble_buff2[13];
};
/**
Storage structure for the OK package sent from the server to
the client.
*/
struct st_ok_package
{
boost::uint8_t result_type;
boost::uint64_t affected_rows;
boost::uint64_t insert_id;
boost::uint16_t server_status;
boost::uint16_t warning_count;
std::string message;
};
struct st_eof_package
{
boost::uint16_t warning_count;
boost::uint16_t status_flags;
};
/**
Storage structure for the Error package sent from the server to
the client.
*/
struct st_error_package
{
boost::uint16_t error_code;
boost::uint8_t sql_state[5];
std::string message;
};
void write_packet_header(char *buff, boost::uint16_t size, boost::uint8_t packet_no);
class Protocol_validator;
class buffer_source;
/**
* The Protocol interface is used to describe a grammar consisting of
* chunks of bytes that are read or written in consequtive order.
* Example:
* class Protocol_impl : public Protocol;
* Protocol_impl chunk1(chunk_datastore1);
* Protocol_impl chunk2(chunk_datastore2);
* input_stream >> chunk1
* >> chunk2;
*/
class Protocol
{
public:
Protocol() { m_length_encoded_binary= false; }
/**
Return the number of bytes which is read or written by this protocol chunk.
The default size is equal to the underlying storage data type.
*/
virtual unsigned int size()=0;
/** Return a pointer to the first byte in a linear storage buffer */
virtual const char *data()=0;
/**
Change the number of bytes which will be read or written to the storage.
The default size is euqal to the storage type size. This can change if the
protocol is reading a length encoded binary.
The new size must always be less than the size of the underlying storage
type.
*/
virtual void collapse_size(unsigned int new_size)=0;
/**
The first byte will have a special interpretation which will indicate
how many bytes should be read next.
*/
void set_length_encoded_binary(bool bswitch) { m_length_encoded_binary= bswitch; }
bool is_length_encoded_binary(void) { return m_length_encoded_binary; }
private:
bool m_length_encoded_binary;
friend std::istream &operator<<(std::istream &is, Protocol &chunk);
friend std::istream &operator>>(std::istream &is, Protocol &chunk);
friend buffer_source &operator>>(buffer_source &src, Protocol &chunk);
friend std::istream &operator>>(std::istream &is, std::string &str);
};
template<typename T>
class Protocol_chunk : public Protocol
{
public:
Protocol_chunk() : Protocol()
{
m_size= 0;
m_data= 0;
}
Protocol_chunk(T &chunk) : Protocol()
{
m_data= (const char *)&chunk;
m_size= sizeof(T);
}
Protocol_chunk(const T &chunk) : Protocol ()
{
m_data= (const char *) &chunk;
m_size= sizeof(T);
}
/**
* @param buffer A pointer to the storage
* @param size The size of the storage
*
* @note If size == 0 then the chunk is a
* length coded binary.
*/
Protocol_chunk(T *buffer, unsigned long size) : Protocol ()
{
m_data= (const char *)buffer;
m_size= size;
}
virtual unsigned int size() { return m_size; }
virtual const char *data() { return m_data; }
virtual void collapse_size(unsigned int new_size)
{
//assert(new_size <= m_size);
memset((char *)m_data+new_size,'\0', m_size-new_size);
m_size= new_size;
}
private:
const char *m_data;
unsigned long m_size;
};
std::ostream &operator<<(std::ostream &os, Protocol &chunk);
typedef Protocol_chunk<boost::uint8_t> Protocol_chunk_uint8;
class Protocol_chunk_string //: public Protocol_chunk_uint8
{
public:
Protocol_chunk_string(std::string &chunk, unsigned long size) //: Protocol_chunk_uint8()
{
m_str= &chunk;
m_str->assign(size,'*');
}
virtual unsigned int size() const { return m_str->size(); }
virtual const char *data() const { return m_str->data(); }
virtual void collapse_size(unsigned int new_size)
{
m_str->resize(new_size);
}
private:
friend std::istream &operator>>(std::istream &is, Protocol_chunk_string &str);
std::string *m_str;
};
class Protocol_chunk_vector : public Protocol_chunk_uint8
{
public:
Protocol_chunk_vector(std::vector<boost::uint8_t> &chunk, unsigned long size)
: Protocol_chunk_uint8()
{
m_vec= &chunk;
m_vec->reserve(size);
m_size= size;
}
virtual unsigned int size() { return m_vec->size(); }
virtual const char *data() { return reinterpret_cast<const char *>(&*m_vec->begin()); }
virtual void collapse_size(unsigned int new_size)
{
m_vec->resize(new_size);
}
private:
friend std::istream &operator>>(std::istream &is, Protocol_chunk_vector &chunk);
std::vector<boost::uint8_t> *m_vec;
unsigned long m_size;
};
std::istream &operator>>(std::istream &is, Protocol_chunk_vector &chunk);
class buffer_source
{
public:
buffer_source(const char *src, int sz)
{
m_src= src;
m_size= sz;
m_ptr= 0;
}
friend buffer_source &operator>>(buffer_source &src, Protocol &chunk);
private:
const char *m_src;
int m_size;
int m_ptr;
};
class Protocol_chunk_string_len
{
public:
Protocol_chunk_string_len(std::string &str)
{
m_storage= &str;
}
private:
friend std::istream &operator>>(std::istream &is, Protocol_chunk_string_len &lenstr);
std::string *m_storage;
};
buffer_source &operator>>(buffer_source &src, Protocol &chunk);
/** TODO assert that the correct endianess is used */
std::istream &operator>>(std::istream &is, Protocol &chunk);
std::istream &operator>>(std::istream &is, std::string &str);
std::istream &operator>>(std::istream &is, Protocol_chunk_string_len &lenstr);
std::istream &operator>>(std::istream &is, Protocol_chunk_string &str);
int proto_read_package_header(tcp::socket *socket, unsigned long *packet_length, unsigned char *packet_no);
/**
* Read a server package header from a stream buffer
*
* @retval 0 Success
* @retval >0 An error occurred
*/
int proto_read_package_header(tcp::socket *socket, boost::asio::streambuf &buff, unsigned long *packet_length, unsigned char *packet_no);
/**
* Get one complete packet from the server
*
* @param socket Pointer to the active tcp-socket
* @param buff A reference to a stream buffer
* @param packet_no [out] The number of the packet as given by the server
*
* @return the size of the packet or 0 to indicate an error
*/
int proto_get_one_package(tcp::socket *socket, boost::asio::streambuf &buff, boost::uint8_t *packet_no);
void prot_parse_error_message(std::istream &is, struct st_error_package &err, int packet_length);
void prot_parse_ok_message(std::istream &is, struct st_ok_package &ok, int packet_length);
void prot_parse_eof_message(std::istream &is, struct st_eof_package &eof);
void proto_get_handshake_package(std::istream &is, struct st_handshake_package &p, int packet_length);
/**
Allocates a new event and copy the header. The caller must be responsible for
releasing the allocated memory.
*/
Query_event *proto_query_event(std::istream &is, Log_event_header *header);
Rotate_event *proto_rotate_event(std::istream &is, Log_event_header *header);
Incident_event *proto_incident_event(std::istream &is, Log_event_header *header);
Row_event *proto_rows_event(std::istream &is, Log_event_header *header);
Table_map_event *proto_table_map_event(std::istream &is, Log_event_header *header);
Int_var_event *proto_intvar_event(std::istream &is, Log_event_header *header);
User_var_event *proto_uservar_event(std::istream &is, Log_event_header *header);
Gtid_event *proto_gtid_event(std::istream &is, Log_event_header *header);
} // end namespace system
} // end namespace mysql
#endif /* _PROTOCOL_H */

View File

@ -1,180 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
#include "resultset_iterator.h"
#include "protocol.h"
#include "row_of_fields.h"
using namespace mysql;
namespace mysql {
Result_set::iterator Result_set::begin() { return iterator(this); }
Result_set::iterator Result_set::end() { return iterator(); }
Result_set::const_iterator Result_set::begin() const { return const_iterator(const_cast<Result_set *>(this)); }
Result_set::const_iterator Result_set::end() const { return const_iterator(); }
void Result_set::digest_row_set()
{
unsigned long packet_length;
unsigned char packet_no= 1;
m_current_state= RESULT_HEADER;
boost::asio::streambuf resultbuff;
std::istream response_stream(&resultbuff);
unsigned field_count= 0;
try {
do
{
/*
* Get server response
*/
packet_length= system::proto_get_one_package(m_socket, resultbuff, &packet_no);
switch(m_current_state)
{
case RESULT_HEADER:
system::digest_result_header(response_stream, m_field_count, m_extra);
m_row_count= 0;
m_current_state= FIELD_PACKETS;
break;
case FIELD_PACKETS:
{
Field_packet field;
system::digest_field_packet(response_stream, field);
m_field_types.assign(field_count,field);
if (++field_count == m_field_count)
m_current_state= MARKER;
}
break;
case MARKER:
{
char marker;
response_stream >> marker;
//assert(marker == 0xfe);
system::digest_marker(response_stream);
m_current_state= ROW_CONTENTS;
}
break;
case ROW_CONTENTS:
{
bool is_eof= false;
Row_of_fields row(0);
system::digest_row_content(response_stream, m_field_count, row, m_storage, is_eof);
if (is_eof)
m_current_state= EOF_PACKET;
else
{
m_rows.push_back(row);
++m_row_count;
}
}
break;
default:
continue;
}
} while (m_current_state != EOF_PACKET);
} catch(boost::system::system_error e)
{
// TODO log error
m_field_count= 0;
m_row_count= 0;
}
}
namespace system {
void digest_result_header(std::istream &is, boost::uint64_t &field_count, boost::uint64_t extra)
{
Protocol_chunk<boost::uint64_t> proto_field_count(field_count);
//Protocol_chunk<boost::uint64_t> proto_extra(extra);
proto_field_count.set_length_encoded_binary(true);
//proto_extra.set_length_encoded_binary(true);
is >> proto_field_count;
//>> proto_extra;
}
void digest_field_packet(std::istream &is, Field_packet &field_packet)
{
Protocol_chunk_string_len proto_catalog(field_packet.catalog);
Protocol_chunk_string_len proto_db(field_packet.db);
Protocol_chunk_string_len proto_table(field_packet.table);
Protocol_chunk_string_len proto_org_table(field_packet.org_table);
Protocol_chunk_string_len proto_name(field_packet.name);
Protocol_chunk_string_len proto_org_name(field_packet.org_name);
Protocol_chunk<boost::uint8_t> proto_marker(field_packet.marker);
Protocol_chunk<boost::uint16_t> proto_charsetnr(field_packet.charsetnr);
Protocol_chunk<boost::uint32_t> proto_length(field_packet.length);
Protocol_chunk<boost::uint8_t> proto_type(field_packet.type);
Protocol_chunk<boost::uint16_t> proto_flags(field_packet.flags);
Protocol_chunk<boost::uint8_t> proto_decimals(field_packet.decimals);
Protocol_chunk<boost::uint16_t> proto_filler(field_packet.filler);
//Protocol_chunk<boost::uint64_t> proto_default_value(field_packet.default_value);
is >> proto_catalog
>> proto_db
>> proto_table
>> proto_org_table
>> proto_name
>> proto_org_name
>> proto_marker
>> proto_charsetnr
>> proto_length
>> proto_type
>> proto_flags
>> proto_decimals
>> proto_filler;
}
void digest_marker(std::istream &is)
{
struct st_eof_package eof;
prot_parse_eof_message(is,eof);
}
void digest_row_content(std::istream &is, int field_count, Row_of_fields &row, String_storage &storage, bool &is_eof)
{
boost::uint8_t size;
Protocol_chunk<boost::uint8_t> proto_size(size);
is >> proto_size;
if (size == 0xfe)
{
/* EOF packet is detected and there are no more rows to be expeced. */
is_eof= true;
struct st_eof_package eof;
prot_parse_eof_message(is, eof);
return;
}
is.putback((char)size);
for(int field_no=0; field_no < field_count; ++field_no)
{
std::string *storage= new std::string;
Protocol_chunk_string_len proto_value(*storage);
is >> proto_value;
Value value(MYSQL_TYPE_VAR_STRING, storage->length(), storage->c_str());
row.push_back(value);
}
}
}} // end namespace system, mysql

View File

@ -1,185 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
#ifndef _RESULTSET_ITERATOR_H
#define _RESULTSET_ITERATOR_H
#include <iostream>
// if error; try #include <boost/iterator.hpp>
#include <boost/iterator/iterator_facade.hpp>
#include <boost/asio.hpp>
#include "value.h"
#include "rowset.h"
#include "row_of_fields.h"
using namespace mysql;
namespace mysql
{
struct Field_packet
{
std::string catalog; // Length Coded String
std::string db; // Length Coded String
std::string table; // Length Coded String
std::string org_table;// Length Coded String
std::string name; // Length Coded String
std::string org_name; // Length Coded String
boost::uint8_t marker; // filler
boost::uint16_t charsetnr; // charsetnr
boost::uint32_t length; // length
boost::uint8_t type; // field type
boost::uint16_t flags;
boost::uint8_t decimals;
boost::uint16_t filler; // filler, always 0x00
//boost::uint64_t default_value; // Length coded binary; only in table descr.
};
typedef std::list<std::string > String_storage;
namespace system {
void digest_result_header(std::istream &is, boost::uint64_t &field_count, boost::uint64_t extra);
void digest_field_packet(std::istream &is, Field_packet &field_packet);
void digest_marker(std::istream &is);
void digest_row_content(std::istream &is, int field_count, Row_of_fields &row, String_storage &storage, bool &is_eof);
}
template <class T>
class Result_set_iterator;
class Result_set
{
public:
typedef Result_set_iterator<Row_of_fields > iterator;
typedef Result_set_iterator<Row_of_fields const > const_iterator;
Result_set(tcp::socket *socket) { source(socket); }
void source(tcp::socket *socket) { m_socket= socket; digest_row_set(); }
iterator begin();
iterator end();
const_iterator begin() const;
const_iterator end() const;
private:
void digest_row_set();
friend class Result_set_iterator<Row_of_fields >;
friend class Result_set_iterator<Row_of_fields const>;
std::vector<Field_packet > m_field_types;
int m_row_count;
std::vector<Row_of_fields > m_rows;
String_storage m_storage;
tcp::socket *m_socket;
typedef enum { RESULT_HEADER,
FIELD_PACKETS,
MARKER,
ROW_CONTENTS,
EOF_PACKET
} state_t;
state_t m_current_state;
/**
* The number of fields in the field packets block
*/
boost::uint64_t m_field_count;
/**
* Used for SHOW COLUMNS to return the number of rows in the table
*/
boost::uint64_t m_extra;
};
template <class Iterator_value_type >
class Result_set_iterator :
public boost::iterator_facade<Result_set_iterator<Iterator_value_type >,
Iterator_value_type,
boost::forward_traversal_tag >
{
public:
Result_set_iterator() : m_feeder(0), m_current_row(-1)
{}
explicit Result_set_iterator(Result_set *feeder) : m_feeder(feeder),
m_current_row(-1)
{
increment();
}
private:
friend class boost::iterator_core_access;
void increment()
{
if (++m_current_row >= m_feeder->m_row_count)
m_current_row= -1;
}
bool equal(const Result_set_iterator& other) const
{
if (other.m_feeder == 0 && m_feeder == 0)
return true;
if (other.m_feeder == 0)
{
if (m_current_row == -1)
return true;
else
return false;
}
if (m_feeder == 0)
{
if (other.m_current_row == -1)
return true;
else
return false;
}
if( other.m_feeder->m_field_count != m_feeder->m_field_count)
return false;
Iterator_value_type *row1= &m_feeder->m_rows[m_current_row];
Iterator_value_type *row2= &other.m_feeder->m_rows[m_current_row];
for (unsigned i=0; i< m_feeder->m_field_count; ++i)
{
Value val1= row1->at(i);
Value val2= row2->at(i);
if (val1 != val2)
return false;
}
return true;
}
Iterator_value_type &dereference() const
{
return m_feeder->m_rows[m_current_row];
}
private:
Result_set *m_feeder;
int m_current_row;
};
} // end namespace mysql
#endif /* _RESULTSET_ITERATOR_H */

View File

@ -1,51 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
#include <vector>
#include "row_of_fields.h"
#include <stdexcept>
#include <boost/foreach.hpp>
#include "value.h"
using namespace mysql;
Row_of_fields& Row_of_fields::operator=(const Row_of_fields &right)
{
if (size() != right.size())
throw std::length_error("Row dimension doesn't match.");
int i= 0;
BOOST_FOREACH(Value value, right)
{
this->assign(++i, value);
}
return *this;
}
Row_of_fields& Row_of_fields::operator=(Row_of_fields &right)
{
if (size() != right.size())
throw std::length_error("Row dimension doesn't match.");
int i= 0;
BOOST_FOREACH(Value value, right)
{
this->assign(++i, value);
}
return *this;
}

View File

@ -1,47 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
#ifndef _ROW_OF_FIELDS_H
#define _ROW_OF_FIELDS_H
#include <vector>
#include <iostream>
#include "value.h"
using namespace mysql;
namespace mysql
{
class Row_of_fields : public std::vector<Value >
{
public:
Row_of_fields() : std::vector<Value >(0) { }
Row_of_fields(int field_count) : std::vector<Value >(field_count) {}
virtual ~Row_of_fields() {}
Row_of_fields& operator=(const Row_of_fields &right);
Row_of_fields& operator=(Row_of_fields &right);
private:
};
}
#endif /* _ROW_OF_FIELDS_H */

View File

@ -1,55 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
#ifndef _ROWSET_H
#define _ROWSET_H
#include "field_iterator.h"
#include "resultset_iterator.h"
#include <boost/function.hpp>
#include <boost/iterator.hpp>
using namespace mysql;
namespace mysql {
class Row_event;
class Table_map_event;
class Row_event_set
{
public:
typedef Row_event_iterator<Row_of_fields > iterator;
typedef Row_event_iterator<Row_of_fields const > const_iterator;
Row_event_set(Row_event *arg1, Table_map_event *arg2) { source(arg1, arg2); }
iterator begin() { return iterator(m_row_event, m_table_map_event); }
iterator end() { return iterator(); }
const_iterator begin() const { return const_iterator(m_row_event, m_table_map_event); }
const_iterator end() const { return const_iterator(); }
private:
void source(Row_event *arg1, Table_map_event *arg2) { m_row_event= arg1; m_table_map_event= arg2; }
Row_event *m_row_event;
Table_map_event *m_table_map_event;
};
}
#endif /* _ROWSET_H */

File diff suppressed because it is too large Load Diff

View File

@ -1,310 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
Copyright (c) 2013-2014, MariaDB Corporation Ab
Portions of this file contain modifications contributed and copyrighted by
MariaDB Corporation, Ab. Those modifications are gratefully acknowledged and are described
briefly in the source code.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
/*
MariaDB Corporation change details:
- Added support for GTID event handling for both MySQL and MariaDB
- Added support for starting binlog dump from GTID position
- Added support for MariaDB server
Author: Jan Lindström (jan.lindstrom@mariadb.com
*/
#ifndef _TCP_DRIVER_H
#define _TCP_DRIVER_H
#include "binlog_driver.h"
#include "protocol.h"
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include "bounded_buffer.h"
#include "gtid.h"
#include <mysql.h>
#define MAX_PACKAGE_SIZE 0xffffff
#define GET_NEXT_PACKET_HEADER \
boost::asio::async_read(*m_socket, boost::asio::buffer(m_net_header, 4), \
boost::bind(&Binlog_tcp_driver::handle_net_packet_header, this, \
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)) \
using boost::asio::ip::tcp;
namespace mysql { namespace system {
class Binlog_tcp_driver : public Binary_log_driver
{
public:
Binlog_tcp_driver(const std::string& user, const std::string& passwd,
const std::string& host, unsigned long port)
: Binary_log_driver("", 4), m_host(host), m_user(user), m_passwd(passwd),
m_port(port), m_socket(NULL), m_waiting_event(0), m_event_loop(0),
m_total_bytes_transferred(0), m_shutdown(false), m_packet_no(0),
m_event_queue(new bounded_buffer<Binary_log_event*>(50))
{
}
~Binlog_tcp_driver()
{
delete m_event_queue;
delete m_socket;
}
/**
* Connect using previously declared connection parameters.
*/
int connect();
int connect(const Gtid gtid);
int connect(const boost::uint64_t binlog_pos);
/**
* Blocking wait for the next binary log event to reach the client
*/
int wait_for_next_event(mysql::Binary_log_event **event);
/**
* Reconnects to the master with a new binlog dump request.
*/
int set_position(const std::string &str, unsigned long position);
/**
* Reconnects to the master with a new binlog dump request.
*/
int set_position_gtid(const Gtid gtid);
int get_position(std::string *str, unsigned long *position);
const std::string& user() const { return m_user; }
const std::string& password() const { return m_passwd; }
const std::string& host() const { return m_host; }
unsigned long port() const { return m_port; }
int fetch_server_version(const std::string& user,
const std::string& passwd,
const std::string& host,
long port);
protected:
/**
* Connects to a mysql server, authenticates and initiates the event
* request loop.
*
* @param user The user account on the server side
* @param passwd The password used to authenticate the user
* @param host The DNS host name or IP of the server
* @param port The service port number to connect to
*
*
* @return Success or failure code
* @retval 0 Successfully established a connection
* @retval >1 An error occurred.
*/
int connect(const std::string& user, const std::string& passwd,
const std::string& host, long port,
const Gtid gtid = Gtid(),
const std::string& binlog_filename="", size_t offset=4);
bool send_client_capabilites(tcp::socket *socket);
bool send_slave_connect_state(tcp::socket *socket,Gtid gtid);
bool get_master_binlog_checksum(tcp::socket *socket);
tcp::socket *sync_connect_and_authenticate(boost::asio::io_service &io_service,
const std::string &user,
const std::string &passwd,
const std::string &host,
long port);
int authenticate(tcp::socket *socket,
const std::string& user,
const std::string& passwd,
const st_handshake_package &handshake_package);
bool fetch_master_status(tcp::socket *socket,
std::string *filename,
unsigned long *position);
bool fetch_binlogs_name_and_size(tcp::socket *socket,
std::map<std::string,
unsigned long> &binlog_map);
private:
/**
* Request a binlog dump and starts the event loop in a new thread
* @param binlog_file_name The base name of the binlog files to query
*
*/
void start_binlog_dump(const std::string &binlog_file_name, size_t offset);
void start_binlog_dump(const Gtid gtid);
/**
* Handles a completed mysql server package header and put a
* request for the body in the job queue.
*/
void handle_net_packet_header(const boost::system::error_code& err, std::size_t bytes_transferred);
/**
* Handles a completed network package with the assumption that it contains
* a binlog event.
*
* TODO rename to handle_event_log_packet?
*/
void handle_net_packet(const boost::system::error_code& err, std::size_t bytes_transferred);
/**
* Called from handle_net_packet(). The function handle a stream of bytes
* representing event packets which may or may not be complete.
* It uses m_waiting_event and the size of the stream as parameters
* in a state machine. If there is no m_waiting_event then the event
* header must be parsed for the event packet length. This can only
* be done if the accumulated stream of bytes are more than 19.
* Next, if there is a m_waiting_event, it can only be completed if
* event_length bytes are waiting on the stream.
*
* If none of these conditions are fullfilled, the function exits without
* any action.
*
* @param err Not used
* @param bytes_transferred The number of bytes waiting in the event stream
*
*/
void handle_event_packet(const boost::system::error_code& err, std::size_t bytes_transferred);
/**
* Executes io_service in a loop.
* TODO Checks for connection errors and reconnects to the server
* if necessary.
*/
void start_event_loop(void);
/**
* Reconnect to the server by first calling disconnect and then connect.
*/
void reconnect(Gtid gtid = Gtid());
/**
* Disconnet from the server. The io service must have been stopped before
* this function is called.
* The event queue is emptied.
*/
void disconnect(void);
/**
* Terminates the io service and sets the shudown flag.
* this causes the event loop to terminate.
*/
void shutdown(void);
boost::thread *m_event_loop;
boost::asio::io_service m_io_service;
tcp::socket *m_socket;
bool m_shutdown;
/**
* Temporary storage for a handshake package
*/
st_handshake_package m_handshake_package;
/**
* Temporary storage for an OK package
*/
st_ok_package m_ok_package;
/**
* Temporary storage for an error package
*/
st_error_package m_error_package;
/**
* each bin log event starts with a 19 byte long header
* We use this sturcture every time we initiate an async
* read.
*/
boost::uint8_t m_event_header[19];
/**
*
*/
boost::uint8_t m_net_header[4];
/**
*
*/
boost::uint8_t m_net_packet[MAX_PACKAGE_SIZE];
boost::asio::streambuf m_event_stream_buffer;
char * m_event_packet;
/**
* This pointer points to an object constructed from event
* stream during async communication with
* server. If it is 0 it means that no event has been
* constructed yet.
*/
Log_event_header *m_waiting_event;
Log_event_header m_log_event_header;
/**
* A ring buffer used to dispatch aggregated events to the user application
*/
bounded_buffer<Binary_log_event *> *m_event_queue;
std::string m_user;
std::string m_host;
std::string m_passwd;
long m_port;
boost::uint32_t m_packet_no;
boost::uint64_t m_total_bytes_transferred;
};
/**
* Sends a SHOW MASTER STATUS command to the server and retrieve the
* current binlog position.
*
* @return False if the operation succeeded, true if it failed.
*/
bool fetch_master_status(tcp::socket *socket, std::string *filename, unsigned long *position);
/**
* Sends a SHOW BINARY LOGS command to the server and stores the file
* names and sizes in a map.
*/
bool fetch_binlogs_name_and_size(tcp::socket *socket, std::map<std::string, unsigned long> &binlog_map);
int authenticate(tcp::socket *socket, const std::string& user,
const std::string& passwd,
const st_handshake_package &handshake_package);
tcp::socket *
sync_connect_and_authenticate(boost::asio::io_service &io_service, const std::string &user,
const std::string &passwd, const std::string &host, long port);
} }
#endif /* _TCP_DRIVER_H */

View File

@ -1,35 +0,0 @@
project(tests)
cmake_minimum_required (VERSION 2.6)
include_directories(../.)
include_directories(../table_replication_consistency)
FIND_LIBRARY(CRYPTO NAMES libcrypto.a /opt/local/lib /opt/lib /usr/lib /usr/local/lib /usr/local/ssl/lib)
FIND_LIBRARY(SSL NAMES libssl.a /opt/local/lib /opt/lib /usr/lib /usr/local/lib /usr/local/ssl/lib)
FIND_LIBRARY(REPLICATION replication /opt/local/lib /opt/lib /usr/lib /usr/local/lib ../)
# Find MySQL client library and header files
find_path(MySQL_INCLUDE_DIR mysql.h
/usr/local/include/mysql /usr/include/mysql /usr/local/mysql/include)
include_directories(${MySQL_INCLUDE_DIR})
include_directories(../../table_replication_consistency)
include_directories(../../utils)
# Find MySQL client library and header files
find_library(MySQL_LIBRARY NAMES libmysqld.a PATHS
/usr/lib64/mysql /usr/lib/mysql /usr/local/mysql/lib ${MARIADB_SRC_PATH}/lib)
SET(Boost_DEBUG FALSE)
SET(Boost_FIND_REQUIRED TRUE)
SET(Boost_FIND_QUIETLY TRUE)
SET(Boost_USE_STATIC_LIBS FALSE)
SET(Boost_ADDITIONAL_VERSIONS "1.41" "1.41.0")
FIND_PACKAGE(Boost REQUIRED system thread)
# Create build rules for all the simple examples that only require a
# single file.
foreach(prog event_dump)
ADD_EXECUTABLE(${prog} ${prog}.cpp /usr/local/mysql/lib/libmysqld.a)
TARGET_LINK_LIBRARIES(${prog} ${REPLICATION} boost_system boost_thread pthread aio ${SSL} ${CRYPTO} crypt z dl ${MySQL_LIBRARY})
endforeach()

View File

@ -1,243 +0,0 @@
/*
Copyright (C) 2013, MariaDB Corporation Ab
This file is distributed as part of the MariaDB Corporation MaxScale. It is free
software: you can redistribute it and/or modify it under the terms of the
GNU General Public License as published by the Free Software Foundation,
version 2.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc., 51
Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
Author: Jan Lindström jan.lindstrom@mariadb.com
*/
#include "binlog_api.h"
#include "listener_exception.h"
#include "table_replication_consistency.h"
#include <getopt.h>
#include <iostream>
#include <iomanip>
#include <map>
#include <sstream>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <regex.h>
#include <algorithm>
#include <my_global.h>
#include <mysql.h>
#include "../gtid.h"
using mysql::Binary_log;
using mysql::system::create_transport;
using namespace std;
using namespace mysql;
using namespace mysql::system;
static char* server_options[] = {
(char *)"event_dump",
(char *)"--datadir=/tmp/",
(char *)"--skip-innodb",
(char *)"--default-storage-engine=myisam",
NULL
};
const int num_elements = (sizeof(server_options) / sizeof(char *)) - 1;
static char* server_groups[] = {
(char *)"libmysqld_server",
(char *)"libmysqld_client",
(char *)"libmysqld_server",
(char *)"libmysqld_server",
NULL
};
void* binlog_reader(void * arg)
{
replication_listener_t *rlt = (replication_listener_t*)arg;
char *uri = rlt->server_url;
map<int, string> tid2tname;
map<int, string>::iterator tb_it;
pthread_t id = pthread_self();
string database_dot_table;
const char* server_type;
Gtid gtid = Gtid();
try {
Binary_log binlog(create_transport(uri));
binlog.connect();
server_type = binlog.get_mysql_server_type_str();
cout << "Server " << uri << " type: " << server_type << endl;
Binary_log_event *event;
while (true) {
Log_event_header *lheader;
int result = binlog.wait_for_next_event(&event);
if (result == ERR_EOF)
break;
lheader = event->header();
switch(event->get_event_type()) {
case QUERY_EVENT: {
Query_event *qevent = dynamic_cast<Query_event *>(event);
std::cout << "Thread: " << id << " server_id " << lheader->server_id
<< " position " << lheader->next_position << " : Found event of type "
<< event->get_event_type()
<< " txt " << get_event_type_str(event->get_event_type())
<< " query " << qevent->query << " db " << qevent->db_name
<< std::endl;
break;
}
case GTID_EVENT_MARIADB:
case GTID_EVENT_MYSQL: {
Gtid_event *gevent = dynamic_cast<Gtid_event *>(event);
std::cout << "Thread: " << id << " server_id " << lheader->server_id
<< " position " << lheader->next_position << " : Found event of type "
<< event->get_event_type()
<< " txt " << get_event_type_str(event->get_event_type())
<< " GTID " << std::string((char *)gevent->m_gtid.get_gtid())
<< " GTID " << gevent->m_gtid.get_string()
<< std::endl;
break;
}
case TABLE_MAP_EVENT: {
Table_map_event *table_map_event= dynamic_cast<Table_map_event*>(event);
database_dot_table= table_map_event->db_name;
database_dot_table.append(".");
database_dot_table.append(table_map_event->table_name);
tid2tname[table_map_event->table_id]= database_dot_table;
break;
}
case WRITE_ROWS_EVENT:
case UPDATE_ROWS_EVENT:
case DELETE_ROWS_EVENT: {
Row_event *revent = dynamic_cast<Row_event*>(event);
tb_it= tid2tname.begin();
tb_it= tid2tname.find(revent->table_id);
if (tb_it != tid2tname.end())
{
database_dot_table= tb_it->second;
}
std::cout << "Thread: " << id << " server_id " << lheader->server_id
<< " position " << lheader->next_position << " : Found event of type "
<< event->get_event_type()
<< " txt " << get_event_type_str(event->get_event_type())
<< " table " << revent->table_id
<< " tb " << database_dot_table
<< std::endl;
break;
}
default:
break;
} // switch
} // while
} // try
catch(ListenerException e)
{
std::cerr << "Listener exception: " << e.what() << std::endl;
}
catch(boost::system::error_code e)
{
std::cerr << "Listener system error: " << e.message() << std::endl;
}
// Try and catch all exceptions
catch(std::exception const& e)
{
std::cerr << "Listener other error: " << e.what() << std::endl;
}
// Rest of them
catch(...)
{
std::cerr << "Unknown exception: " << std::endl;
// Re-Throw this one.
// It was not handled so you want to make sure it is handled correctly by
// the OS. So just allow the exception to keep propagating.
throw;
}
pthread_exit(NULL);
return NULL;
}
int main(int argc, char** argv) {
int number_of_args = argc;
int i=0,k=0;
pthread_t *tid=NULL;
char *uri;
replication_listener_t *mrl;
int err=0;
tid = (pthread_t*)malloc(sizeof(pthread_t) * argc);
mrl = (replication_listener_t*)calloc(argc, sizeof(replication_listener_t));
if (argc < 2) {
std::cerr << "Usage: event_dump <uri>" << std::endl;
exit(2);
}
if (mysql_library_init(num_elements, server_options, server_groups)) {
std::cerr << "Failed to init MySQL server" << std::endl;
exit(1);
}
argc =0;
while(argc != number_of_args)
{
uri= argv[argc++];
if ( strncmp("mysql://", uri, 8) == 0) {
mrl[i].server_url = uri;
if (argc == 1) {
mrl[i].is_master = 1;
}
err = pthread_create(&(tid[i++]), NULL, &binlog_reader, (void *)&mrl[i]);
if (err ) {
perror(NULL);
break;
}
}
}//end of outer while loop
for(k=0; k < i; k++)
{
err = pthread_join(tid[k], (void **)&(mrl[k]));
if (err) {
perror(NULL);
}
}
exit(0);
}

View File

@ -1,609 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
#include "value.h"
#include "binlog_event.h"
#include <boost/lexical_cast.hpp>
#include <iomanip>
#include <boost/format.hpp>
#include <mysql.h>
#define DIG_PER_DEC1 9
using namespace mysql;
using namespace mysql::system;
namespace mysql {
static const int dig2bytes[DIG_PER_DEC1 + 1] = {0, 1, 1, 2, 2, 3, 3, 4, 4, 4};
int decimal_bin_size(int precision, int scale)
{
int intg = precision - scale;
int intg0 = intg / DIG_PER_DEC1;
int frac0 = scale / DIG_PER_DEC1;
int intg0x = intg - intg0 * DIG_PER_DEC1;
int frac0x = scale - frac0 * DIG_PER_DEC1;
return(
intg0 * sizeof(boost::int32_t) + dig2bytes[intg0x] +
frac0 * sizeof(boost::int32_t) + dig2bytes[frac0x]
);
}
int calc_field_size(unsigned char column_type, const unsigned char *field_ptr, boost::uint32_t metadata)
{
boost::uint32_t length;
switch (column_type) {
case MYSQL_TYPE_VAR_STRING:
/* This type is hijacked for result set types. */
length= metadata;
break;
case MYSQL_TYPE_NEWDECIMAL:
{
int precision = (metadata & 0xff);
int scale = metadata >> 8;
length = decimal_bin_size(precision, scale);
break;
}
case MYSQL_TYPE_DECIMAL:
case MYSQL_TYPE_FLOAT:
case MYSQL_TYPE_DOUBLE:
length= metadata;
break;
/*
The cases for SET and ENUM are include for completeness, however
both are mapped to type MYSQL_TYPE_STRING and their real types
are encoded in the field metadata.
*/
case MYSQL_TYPE_SET:
case MYSQL_TYPE_ENUM:
case MYSQL_TYPE_STRING:
{
//unsigned char type= metadata >> 8U;
unsigned char type = metadata & 0xff;
if ((type == MYSQL_TYPE_SET) || (type == MYSQL_TYPE_ENUM))
{
//length= metadata & 0x00ff;
length = (metadata & 0xff00) >> 8;
}
else
{
/*
We are reading the actual size from the master_data record
because this field has the actual lengh stored in the first
byte.
*/
length= (unsigned int) *field_ptr+1;
//DBUG_ASSERT(length != 0);
}
break;
}
case MYSQL_TYPE_YEAR:
case MYSQL_TYPE_TINY:
length= 1;
break;
case MYSQL_TYPE_SHORT:
length= 2;
break;
case MYSQL_TYPE_INT24:
length= 3;
break;
case MYSQL_TYPE_LONG:
length= 4;
break;
case MYSQL_TYPE_LONGLONG:
length= 8;
break;
case MYSQL_TYPE_NULL:
length= 0;
break;
case MYSQL_TYPE_NEWDATE:
length= 3;
break;
case MYSQL_TYPE_DATE:
case MYSQL_TYPE_TIME:
length= 3;
break;
case MYSQL_TYPE_TIMESTAMP:
length= 4;
break;
case MYSQL_TYPE_DATETIME:
length= 8;
break;
case MYSQL_TYPE_BIT:
{
/*
Decode the size of the bit field from the master.
from_len is the length in bytes from the master
from_bit_len is the number of extra bits stored in the master record
If from_bit_len is not 0, add 1 to the length to account for accurate
number of bytes needed.
*/
boost::uint32_t from_len= (metadata >> 8U) & 0x00ff;
boost::uint32_t from_bit_len= metadata & 0x00ff;
//DBUG_ASSERT(from_bit_len <= 7);
length= from_len + ((from_bit_len > 0) ? 1 : 0);
break;
}
case MYSQL_TYPE_VARCHAR:
{
length= metadata > 255 ? 2 : 1;
length+= length == 1 ? (boost::uint32_t) *field_ptr : *((boost::uint16_t *)field_ptr);
break;
}
case MYSQL_TYPE_TINY_BLOB:
case MYSQL_TYPE_MEDIUM_BLOB:
case MYSQL_TYPE_LONG_BLOB:
case MYSQL_TYPE_BLOB:
case MYSQL_TYPE_GEOMETRY:
{
switch (metadata)
{
case 1:
length= 1+ (boost::uint32_t) field_ptr[0];
break;
case 2:
length= 2+ (boost::uint32_t) (*(boost::uint16_t *)(field_ptr) & 0xFFFF);
break;
case 3:
// TODO make platform indep.
length= 3+ (boost::uint32_t) (long) (*((boost::uint32_t *) (field_ptr)) & 0xFFFFFF);
break;
case 4:
// TODO make platform indep.
length= 4+ (boost::uint32_t) (long) *((boost::uint32_t *) (field_ptr));
break;
default:
length= 0;
break;
}
break;
}
default:
length= ~(boost::uint32_t) 0;
}
return length;
}
/*
Value::Value(Value &val)
{
m_size= val.length();
m_storage= val.storage();
m_type= val.type();
m_metadata= val.metadata();
m_is_null= val.is_null();
}
*/
Value::Value(const Value& val)
{
m_size= val.m_size;
m_storage= val.m_storage;
m_type= val.m_type;
m_metadata= val.m_metadata;
m_is_null= val.m_is_null;
}
Value &Value::operator=(const Value &val)
{
m_size= val.m_size;
m_storage= val.m_storage;
m_type= val.m_type;
m_metadata= val.m_metadata;
m_is_null= val.m_is_null;
return *this;
}
bool Value::operator==(const Value &val) const
{
return (m_size == val.m_size) &&
(m_storage == val.m_storage) &&
(m_type == val.m_type) &&
(m_metadata == val.m_metadata);
}
bool Value::operator!=(const Value &val) const
{
return !operator==(val);
}
char *Value::as_c_str(unsigned long &size) const
{
if (m_is_null || m_size == 0)
{
size= 0;
return 0;
}
/*
Length encoded; First byte is length of string.
*/
int metadata_length= m_size > 251 ? 2: 1;
/*
Size is length of the character string; not of the entire storage
*/
size= m_size - metadata_length;
char *str = const_cast<char *>(m_storage + metadata_length);
if (m_type == MYSQL_TYPE_VARCHAR && m_metadata > 255) {
str++;
size--;
}
return str;
}
unsigned char *Value::as_blob(unsigned long &size) const
{
if (m_is_null || m_size == 0)
{
size= 0;
return 0;
}
/*
Size was calculated during construction of the object and only inludes the
size of the blob data, not the metadata part which also is stored in the
storage. For blobs this part can be between 1-4 bytes long.
*/
size= m_size - m_metadata;
/*
Adjust the storage pointer with the size of the metadata.
*/
return (unsigned char *)(m_storage + m_metadata);
}
boost::int32_t Value::as_int32() const
{
if (m_is_null)
{
return 0;
}
boost::uint32_t to_int;
Protocol_chunk<boost::uint32_t> prot_integer(to_int);
buffer_source buff(m_storage, m_size);
buff >> prot_integer;
return to_int;
}
boost::int8_t Value::as_int8() const
{
if (m_is_null)
{
return 0;
}
boost::int8_t to_int;
Protocol_chunk<boost::int8_t> prot_integer(to_int);
buffer_source buff(m_storage, m_size);
buff >> prot_integer;
return to_int;
}
boost::int16_t Value::as_int16() const
{
if (m_is_null)
{
return 0;
}
boost::int16_t to_int;
Protocol_chunk<boost::int16_t> prot_integer(to_int);
buffer_source buff(m_storage, m_size);
buff >> prot_integer;
return to_int;
}
boost::int64_t Value::as_int64() const
{
if (m_is_null)
{
return 0;
}
boost::int64_t to_int;
Protocol_chunk<boost::int64_t> prot_integer(to_int);
buffer_source buff(m_storage, m_size);
buff >> prot_integer;
return to_int;
}
float Value::as_float() const
{
// TODO
return *((const float *)storage());
}
double Value::as_double() const
{
// TODO
return *((const double *)storage());
}
void Converter::to(std::string &str, const Value &val) const
{
if (val.is_null())
{
str= "(NULL)";
return;
}
switch(val.type())
{
case MYSQL_TYPE_DECIMAL:
str= "not implemented";
break;
case MYSQL_TYPE_TINY:
str= boost::lexical_cast<std::string>(static_cast<int>(val.as_int8()));
break;
case MYSQL_TYPE_SHORT:
str= boost::lexical_cast<std::string>(val.as_int16());
break;
case MYSQL_TYPE_LONG:
str= boost::lexical_cast<std::string>(val.as_int32());
break;
case MYSQL_TYPE_FLOAT:
{
str= boost::str(boost::format("%d") % val.as_float());
}
break;
case MYSQL_TYPE_DOUBLE:
str= boost::str(boost::format("%d") % val.as_double());
break;
case MYSQL_TYPE_NULL:
str= "not implemented";
break;
case MYSQL_TYPE_TIMESTAMP:
str= boost::lexical_cast<std::string>((boost::uint32_t)val.as_int32());
break;
case MYSQL_TYPE_LONGLONG:
str= boost::lexical_cast<std::string>(val.as_int64());
break;
case MYSQL_TYPE_INT24:
str= "not implemented";
break;
case MYSQL_TYPE_DATE:
{
const char* val_storage = val.storage();
unsigned int date_val = (val_storage[0] & 0xff) + ((val_storage[1] & 0xff) << 8) + ((val_storage[2] & 0xff) << 16);
unsigned int date_year = date_val >> 9;
date_val -= (date_year << 9);
unsigned int date_month = date_val >> 5;
unsigned int date_day = date_val - (date_month << 5);
str = boost::str(boost::format("%04d-%02d-%02d") % date_year % date_month % date_day);
break;
}
case MYSQL_TYPE_DATETIME:
{
boost::uint64_t timestamp= val.as_int64();
unsigned long d= timestamp / 1000000;
unsigned long t= timestamp % 1000000;
std::ostringstream os;
os << std::setfill('0') << std::setw(4) << d / 10000
<< std::setw(1) << '-'
<< std::setw(2) << (d % 10000) / 100
<< std::setw(1) << '-'
<< std::setw(2) << d % 100
<< std::setw(1) << ' '
<< std::setw(2) << t / 10000
<< std::setw(1) << ':'
<< std::setw(2) << (t % 10000) / 100
<< std::setw(1) << ':'
<< std::setw(2) << t % 100;
str= os.str();
}
break;
case MYSQL_TYPE_TIME:
{
const char* val_storage = val.storage();
unsigned int time_val = (val_storage[0] & 0xff) + ((val_storage[1] & 0xff) << 8) + ((val_storage[2] & 0xff) << 16);
unsigned int time_sec = time_val % 100;
time_val -= time_sec;
unsigned int time_min = (time_val % 10000) / 100;
unsigned int time_hour = (time_val - time_min) / 10000;
str = boost::str(boost::format("%02d:%02d:%02d") % time_hour % time_min % time_sec);
break;
}
case MYSQL_TYPE_YEAR:
{
const char* val_storage = val.storage();
unsigned int year_val = (val_storage[0] & 0xff);
year_val = year_val > 0 ? (year_val + 1900) : 0;
str = boost::str(boost::format("%04d") % year_val);
break;
}
case MYSQL_TYPE_NEWDATE:
str= "not implemented";
break;
case MYSQL_TYPE_VARCHAR:
{
unsigned long size;
char *ptr= val.as_c_str(size);
str.append(ptr, size);
}
break;
case MYSQL_TYPE_VAR_STRING:
{
str.append(val.storage(), val.length());
}
break;
case MYSQL_TYPE_STRING:
{
unsigned char str_type = 0;
if (val.metadata()) {
str_type = val.metadata() & 0xff;
}
if (str_type == MYSQL_TYPE_SET) {
str = "not implemented";
break;
} else if (str_type == MYSQL_TYPE_ENUM) {
unsigned int val_storage = static_cast<unsigned int>(*val.storage());
str = boost::str(boost::format("%u") % val_storage);
break;
}
unsigned long size;
char *ptr= val.as_c_str(size);
str.append(ptr, size);
}
break;
case MYSQL_TYPE_BIT:
str= "not implemented";
break;
case MYSQL_TYPE_NEWDECIMAL:
str= "not implemented";
break;
case MYSQL_TYPE_ENUM:
str= "not implemented";
break;
case MYSQL_TYPE_SET:
str= "not implemented";
break;
case MYSQL_TYPE_TINY_BLOB:
case MYSQL_TYPE_MEDIUM_BLOB:
case MYSQL_TYPE_LONG_BLOB:
case MYSQL_TYPE_BLOB:
{
unsigned long size;
unsigned char *ptr= val.as_blob(size);
str.append((const char *)ptr, size);
}
break;
case MYSQL_TYPE_GEOMETRY:
str= "not implemented";
break;
default:
str= "not implemented";
break;
}
}
void Converter::to(float &out, const Value &val) const
{
switch(val.type())
{
case MYSQL_TYPE_FLOAT:
out= val.as_float();
break;
default:
out= 0;
}
}
void Converter::to(long &out, const Value &val) const
{
switch(val.type())
{
case MYSQL_TYPE_DECIMAL:
// TODO
out= 0;
break;
case MYSQL_TYPE_TINY:
out= val.as_int8();
break;
case MYSQL_TYPE_SHORT:
out= val.as_int16();
break;;
case MYSQL_TYPE_LONG:
out= (long)val.as_int32();
break;
case MYSQL_TYPE_FLOAT:
out= 0;
break;
case MYSQL_TYPE_DOUBLE:
out= (long)val.as_double();
case MYSQL_TYPE_NULL:
out= 0;
break;
case MYSQL_TYPE_TIMESTAMP:
out=(boost::uint32_t)val.as_int32();
break;
case MYSQL_TYPE_LONGLONG:
out= (long)val.as_int64();
break;
case MYSQL_TYPE_INT24:
out= 0;
break;
case MYSQL_TYPE_DATE:
out= 0;
break;
case MYSQL_TYPE_TIME:
out= 0;
break;
case MYSQL_TYPE_DATETIME:
out= (long)val.as_int64();
break;
case MYSQL_TYPE_YEAR:
out= 0;
break;
case MYSQL_TYPE_NEWDATE:
out= 0;
break;
case MYSQL_TYPE_VARCHAR:
out= 0;
break;
case MYSQL_TYPE_BIT:
out= 0;
break;
case MYSQL_TYPE_NEWDECIMAL:
out= 0;
break;
case MYSQL_TYPE_ENUM:
out= 0;
break;
case MYSQL_TYPE_SET:
out= 0;
break;
case MYSQL_TYPE_TINY_BLOB:
case MYSQL_TYPE_MEDIUM_BLOB:
case MYSQL_TYPE_LONG_BLOB:
case MYSQL_TYPE_BLOB:
out= 0;
break;
case MYSQL_TYPE_VAR_STRING:
{
std::string str;
str.append(val.storage(), val.length());
out= boost::lexical_cast<long>(str.c_str());
}
break;
case MYSQL_TYPE_STRING:
out= 0;
break;
case MYSQL_TYPE_GEOMETRY:
out= 0;
break;
default:
out= 0;
break;
}
}
} // end namespace mysql

View File

@ -1,182 +0,0 @@
/*
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights
reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
#ifndef _VALUE_ADAPTER_H
#define _VALUE_ADAPTER_H
#include <boost/cstdint.hpp>
#include "protocol.h"
#include <boost/any.hpp>
#include <iostream>
#include <mysql.h>
using namespace mysql;
namespace mysql {
/**
This helper function calculates the size in bytes of a particular field in a
row type event as defined by the field_ptr and metadata_ptr arguments.
@param column_type Field type code
@param field_ptr The field data
@param metadata_ptr The field metadata
@note We need the actual field data because the string field size is not
part of the meta data. :(
@return The size in bytes of a particular field
*/
int calc_field_size(unsigned char column_type, const unsigned char *field_ptr,
boost::uint32_t metadata);
/**
* A value object class which encapsluate a tuple (value type, metadata, storage)
* and provide for views to this storage through a well defined interface.
*
* Can be used with a Converter to convert between different Values.
*/
class Value
{
public:
Value(enum enum_field_types type, boost::uint32_t metadata, const char *storage) :
m_type(type), m_storage(storage), m_metadata(metadata), m_is_null(false)
{
m_size= calc_field_size((unsigned char)type,
(const unsigned char*)storage,
metadata);
//std::cout << "TYPE: " << type << " SIZE: " << m_size << std::endl;
};
Value()
{
m_size= 0;
m_storage= 0;
m_metadata= 0;
m_is_null= false;
}
/**
* Copy constructor
*/
Value(const Value& val);
Value &operator=(const Value &val);
bool operator==(const Value &val) const;
bool operator!=(const Value &val) const;
~Value() {}
void is_null(bool s) { m_is_null= s; }
bool is_null(void) const { return m_is_null; }
const char *storage() const { return m_storage; }
/**
* Get the length in bytes of the entire storage (any metadata part +
* atual data)
*/
size_t length() const { return m_size; }
enum enum_field_types type() const { return m_type; }
boost::uint32_t metadata() const { return m_metadata; }
/**
* Returns the integer representation of a storage of a pre-specified
* type.
*/
boost::int32_t as_int32() const;
/**
* Returns the integer representation of a storage of pre-specified
* type.
*/
boost::int64_t as_int64() const;
/**
* Returns the integer representation of a storage of pre-specified
* type.
*/
boost::int8_t as_int8() const;
/**
* Returns the integer representation of a storage of pre-specified
* type.
*/
boost::int16_t as_int16() const;
/**
* Returns a pointer to the character data of a string type stored
* in the pre-defined storage.
* @note The position is an offset of the storage pointer determined
* by the metadata and type.
*
* @param[out] size The size in bytes of the character string.
*
*/
char *as_c_str(unsigned long &size) const;
/**
* Returns a pointer to the byte data of a blob type stored in the pre-
* defined storage.
* @note The position is an offset of the storage pointer determined
* by the metadata and type.
*
* @param[out] size The size in bytes of the blob data.
*/
unsigned char *as_blob(unsigned long &size) const;
float as_float() const;
double as_double() const;
private:
enum enum_field_types m_type;
size_t m_size;
const char *m_storage;
boost::uint32_t m_metadata;
bool m_is_null;
};
class Converter
{
public:
/**
* Converts and copies the sql value to a std::string object.
* @param[out] str The target string
* @param[in] val The value object to be converted
*/
void to(std::string &str, const Value &val) const;
/**
* Converts and copies the sql value to a long integer.
* @param[out] out The target variable
* @param[in] val The value object to be converted
*/
void to(long &out, const Value &val) const;
/**
* Converts and copies the sql value to a floating point number.
* @param[out] out The target variable
* @param[in] val The value object to be converted
*/
void to(float &out, const Value &val) const;
};
} // end namespace mysql
#endif /* _VALUE_ADAPTER_H */