Merge branch '2.2-mrm' into develop

This commit is contained in:
Johan Wikman
2017-11-07 12:06:29 +02:00
21 changed files with 1341 additions and 742 deletions

View File

@ -15,6 +15,10 @@ target_link_libraries(testfilter maxscale-common)
set_target_properties(testfilter PROPERTIES VERSION "1.0.0")
install_module(testfilter core)
add_library(examplecppfilter SHARED examplefilter.cc examplefiltersession.cc)
set_target_properties(examplecppfilter PROPERTIES VERSION "1.0.0")
install_module(examplecppfilter core)
add_library(testprotocol SHARED testprotocol.c)
set_target_properties(testprotocol PROPERTIES VERSION "1.0.0")
install_module(testprotocol core)

78
examples/examplefilter.cc Normal file
View File

@ -0,0 +1,78 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
// All log messages from this module are prefixed with this
#define MXS_MODULE_NAME "examplefilter"
#include "examplefilter.hh"
// This declares a module in MaxScale
extern "C" MXS_MODULE* MXS_CREATE_MODULE()
{
static MXS_MODULE info =
{
MXS_MODULE_API_FILTER,
MXS_MODULE_IN_DEVELOPMENT,
MXS_FILTER_VERSION,
"An example filter that does nothing",
"V1.0.0",
RCAP_TYPE_NONE,
&ExampleFilter::s_object, // This is defined in the MaxScale filter template
NULL, /* Process init. */
NULL, /* Process finish. */
NULL, /* Thread init. */
NULL, /* Thread finish. */
{
{ "an_example_parameter", MXS_MODULE_PARAM_STRING, "a-default-value" },
{ MXS_END_MODULE_PARAMS }
}
};
return &info;
}
ExampleFilter::ExampleFilter()
{
}
ExampleFilter::~ExampleFilter()
{
}
// static
ExampleFilter* ExampleFilter::create(const char* zName, char** pzOptions, MXS_CONFIG_PARAMETER* pParams)
{
return new ExampleFilter();
}
ExampleFilterSession* ExampleFilter::newSession(MXS_SESSION* pSession)
{
return ExampleFilterSession::create(pSession, this);
}
// static
void ExampleFilter::diagnostics(DCB* pDcb) const
{
}
// static
json_t* ExampleFilter::diagnostics_json() const
{
return NULL;
}
// static
uint64_t ExampleFilter::getCapabilities()
{
return RCAP_TYPE_NONE;
}

46
examples/examplefilter.hh Normal file
View File

@ -0,0 +1,46 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include <maxscale/filter.hh>
#include "examplefiltersession.hh"
class ExampleFilter : public maxscale::Filter<ExampleFilter, ExampleFilterSession>
{
// Prevent copy-constructor and assignment operator usage
ExampleFilter(const ExampleFilter&);
ExampleFilter& operator = (const ExampleFilter&);
public:
~ExampleFilter();
// Creates a new filter instance
static ExampleFilter* create(const char* zName, char** pzOptions, MXS_CONFIG_PARAMETER* ppParams);
// Creates a new session for this filter
ExampleFilterSession* newSession(MXS_SESSION* pSession);
// Print diagnostics to a DCB
void diagnostics(DCB* pDcb) const;
// Returns JSON form diagnostic data
json_t* diagnostics_json() const;
// Get filter capabilities
uint64_t getCapabilities();
private:
// Used in the create function
ExampleFilter();
};

View File

@ -0,0 +1,43 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
// All log messages from this module are prefixed with this
#define MXS_MODULE_NAME "examplefilter"
#include "examplefiltersession.hh"
#include "examplefilter.hh"
ExampleFilterSession::ExampleFilterSession(MXS_SESSION* pSession)
: mxs::FilterSession(pSession)
{
}
ExampleFilterSession::~ExampleFilterSession()
{
}
//static
ExampleFilterSession* ExampleFilterSession::create(MXS_SESSION* pSession, const ExampleFilter* pFilter)
{
return new ExampleFilterSession(pSession);
}
int ExampleFilterSession::routeQuery(GWBUF* pPacket)
{
return mxs::FilterSession::routeQuery(pPacket);
}
int ExampleFilterSession::clientReply(GWBUF* pPacket)
{
return mxs::FilterSession::clientReply(pPacket);
}

View File

@ -0,0 +1,44 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include <maxscale/filter.hh>
class ExampleFilter;
class ExampleFilterSession : public maxscale::FilterSession
{
// Prevent copy-constructor and assignment operator usage
ExampleFilterSession(const ExampleFilterSession&);
ExampleFilterSession& operator = (const ExampleFilterSession&);
public:
~ExampleFilterSession();
// Called when a client session has been closed
void close();
// Create a new filter session
static ExampleFilterSession* create(MXS_SESSION* pSession, const ExampleFilter* pFilter);
// Handle a query from the client
int routeQuery(GWBUF* pPacket);
// Handle a reply from server
int clientReply(GWBUF* pPacket);
private:
// Used in the create function
ExampleFilterSession(MXS_SESSION* pSession);
};

360
include/maxscale/future.hh Normal file
View File

@ -0,0 +1,360 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include <memory>
#include <maxscale/debug.h>
#include <maxscale/semaphore.hh>
namespace maxscale
{
// Internal, not intended for public consumption.
class future_internal
{
public:
virtual ~future_internal() {}
};
/**
* The class template maxscale::future provides a mechanism to access the result of
* asynchronous operations. It is based upon C++11's std::future as documented here:
* http://en.cppreference.com/w/cpp/thread/future
*
* std::future uses C++11's rvalue references, which are not available on the
* environments where MaxScale is compiled. Consequently, some care is needed
* when using maxscale::future so that unintended copying does not occur.
*
* When C++11 is available, it should be straightforward to take the std::future
* into use.
*/
template<class T>
class future
{
public:
/**
* Constructs a future with no shared state.
*/
future()
{
}
/**
* Move constructor
*
* @note This looks like a regular copy-constructor, but should be treated as
* a move constructor.
*
* @param other The future to move. After the call, @c other will not refer
* to a future result.
*/
future(const future& other)
: m_sInternal(other.m_sInternal)
{
other.m_sInternal.reset();
}
/**
* Move a future.
*
* @note This looks like a regular assignment operator, but the assigned value
* is moved.
*
* @param rhs The future to move. After the call, @c rhs will not refer
* to a future result.
* @return *this
*/
future& operator = (const future& rhs)
{
future copy(rhs);
copy.swap(*this);
return *this;
}
/**
* Destructor
*/
~future()
{
}
/**
* Swap the content
*
* @param rhs The future to swap the contents with.
*/
void swap(future& rhs)
{
std::swap(m_sInternal, rhs.m_sInternal);
}
/**
* Checks if the future refers to a shared state
*
* @return True, if this future refers to shared state, false otherwise.
*/
bool valid() const
{
return m_sInternal.get() != NULL;
}
/**
* Waits until the future has a valid result and returns it.
*
* @note After the function returns, the future will no longer be valid.
*
* @return The stored value.
*/
T get()
{
ss_dassert(valid());
if (valid())
{
T rv = m_sInternal->get();
m_sInternal.reset();
return rv;
}
else
{
MXS_ERROR("Get called on non-valid future.");
return T();
}
}
/**
* Blocks until the result becomes available.
*
* @note Only a valid future can be waited for.
*/
void wait() const
{
ss_dassert(valid());
if (valid())
{
m_sInternal->wait();
}
else
{
MXS_ERROR("An attempt to wait on a non-valid future.");
}
}
public:
class internal : public future_internal
{
public:
internal()
: m_t()
, m_waited(false)
{
}
~internal()
{
}
T get()
{
wait();
return m_t;
}
void set(T t)
{
m_t = t;
m_sem.post();
}
void wait() const
{
if (!m_waited)
{
m_sem.wait();
m_waited = true;
}
}
private:
T m_t;
mutable bool m_waited;
Semaphore m_sem;
};
future(std::shared_ptr<internal> sInternal)
: m_sInternal(sInternal)
{
}
private:
mutable std::shared_ptr<internal> m_sInternal;
};
/**
* The class template std::packaged_task wraps a function so that it can be called
* asynchronously. It is based upon C++11 std::packaged_task as documented here:
* http://en.cppreference.com/w/cpp/thread/packaged_task
*
* std::packaged_task uses C++11's rvalue references, which are not available on the
* environments where MaxScale is compiled. Consequently, some care is needed
* when using maxscale::packaged_task so that unintended copying does not occur.
*
* When C++11 is available, it should be straightforward to take the std::packed_task
* into use.
*
* Contrary to std::packaged_task, also due to lack of functionality introduced by
* C++11, maxscale::packaged_task is not fully generic, but can only package a function
* returning a value and taking one argument.
*/
template<class R, class T>
class packaged_task
{
typedef typename future<R>::internal internal;
public:
typedef R (*function)(T);
/**
* Creates a packaged_task with no task and no shared state.
*/
packaged_task()
: m_f(NULL)
, m_get_future_called(false)
{
}
/**
* Creates a packaged_task referring to the provided function.
*
* @param f The function to package.
*/
packaged_task(function f)
: m_f(f)
, m_sInternal(new internal)
, m_get_future_called(false)
{
}
/**
* Move constructor
*
* @note This looks like a regular copy-constructor, but should be treated as
* a move constructor.
*
* @param other The packaged_task to move. After the call, @c other will not
* refer to a packaged task.
*/
packaged_task(const packaged_task& other)
: m_f(other.m_f)
, m_sInternal(other.m_sInternal)
, m_get_future_called(other.m_get_future_called)
{
other.m_f = NULL;
other.m_sInternal.reset();
other.m_get_future_called = false;
}
/**
* Move a packaged_task
*
* @note This looks like a regular assignment operator, but the assigned value
* is moved.
*
* @param rhs The packaged_task to move. After the call, @c rhs will not
* refer to a packaged task.
*/
packaged_task& operator = (const packaged_task& rhs)
{
packaged_task copy(rhs);
copy.swap(*this);
return *this;
}
/**
* Destructor
*/
~packaged_task()
{
if (m_sInternal.get())
{
ss_dassert(m_get_future_called);
// The ownership of m_pFuture has moved to the future
// that was obtained in the call to get_future().
if (!m_get_future_called)
{
MXS_ERROR("Packaged task destructed without future result having been asked for.");
}
}
}
/**
* Swap the content
*
* @param rhs The packaged_task to swap the contents with.
*/
void swap(packaged_task& rhs)
{
std::swap(m_f, rhs.m_f);
std::swap(m_sInternal, rhs.m_sInternal);
std::swap(m_get_future_called, rhs.m_get_future_called);
}
/**
* Checks the validity of the packaged_task.
*
* @return True, if the packaged_task contains share state, false otherwise.
*/
bool valid() const
{
return m_sInternal.get() != NULL;
}
/**
* Returns a future which shares the same shared state as this packaged_task.
*
* @note @c get_future can be called only once for each packaged_task.
*
* @return A future.
*/
future<R> get_future()
{
ss_dassert(!m_get_future_called);
if (!m_get_future_called)
{
m_get_future_called = true;
return future<R>(m_sInternal);
}
else
{
MXS_ERROR("get_future called more than once.");
return future<R>();
}
};
/**
* Calls the stored task with the provided argument.
*
* After this call, anyone waiting for the shared result will be unblocked.
*/
void operator()(T arg)
{
m_sInternal->set(m_f(arg));
}
private:
mutable function m_f;
mutable std::shared_ptr<internal> m_sInternal;
mutable bool m_get_future_called;
};
}

163
include/maxscale/thread.hh Normal file
View File

@ -0,0 +1,163 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include <tr1/memory>
#include <maxscale/thread.h>
#include <maxscale/future.hh>
namespace maxscale
{
/**
* The class maxscale::thread represents a single thread of execution. It
* is based upon C++11's std::thread as documented here:
* http://en.cppreference.com/w/cpp/thread/thread
*
* std::thread uses C++11's rvalue references, which are not available on the
* environments where MaxScale is compiled. Consequently, some care is needed
* when using maxscale::thread so that unintended copying does not occur.
*
* When C++11 is available, it should be straightforward to take the std::thread
* into use.
*/
class thread
{
public:
/**
* Creates a thread object which does not represent a running thread.
*/
thread();
/**
* Move constructor
*
* @note This looks like a regular copy-constructor, but should be treated as
* a move constructor.
*
* @param other The thread to move. After the call, @c other will not refer
* to a thread.
*/
thread(const thread& other);
/**
* Creates a new thread object and associates it with a thread of execution.
* The new thread will executed the provided task using the provided argument.
*
* @note The actual thread is started by the constructor.
*
* @param task The task to execute in the new thread.
* @param arg The argument to provide to the task when invoked.
* Must remain valid for the lifetime of the thread.
*/
template<class R, class T>
thread(packaged_task<R, T>& task, T arg)
: m_pInternal(new internal(new task_packaged_task<R,T>(task, arg)))
{
run();
}
/**
* Move a thread
*
* @note This looks like a regular assignment operator, but the assigned value
* is moved.
*
* @param rhs The thread to move. After the call, @c rhs will not
* refer to a packaged task.
*/
thread& operator = (const thread& rhs);
/**
* Destructor
*
* The thread must have been joined before the thread object is destructed.
*/
~thread();
/**
* Whether a thread is joinable
*
* @return True, if the thread can be joined, false otherwise.
*/
bool joinable() const;
/**
* Join the thread.
*/
void join();
/**
* Swap the content
*
* @param rhs The thread to swap the contents with.
*/
void swap(thread& rhs);
private:
void run();
class task
{
public:
virtual ~task() {}
virtual void run() = 0;
};
template<class R, class T>
class task_packaged_task : public task
{
public:
task_packaged_task(packaged_task<R, T>& task, T arg)
: m_task(task)
, m_arg(arg)
{
}
void run()
{
m_task(m_arg);
}
private:
packaged_task<R, T> m_task;
T m_arg;
};
class internal
{
public:
internal(task* pTask);
~internal();
bool joinable() const;
void join();
void run();
private:
void main();
static void main(void* pArg);
private:
task* m_pTask;
THREAD m_thread;
};
private:
mutable internal* m_pInternal;
};
}

View File

@ -11,5 +11,5 @@ src=$1
cp -r -t $PWD/maxctrl $src/maxctrl/* && cd $PWD/maxctrl
npm install
npm install pkg
npm install pkg@4.2.3
node_modules/pkg/lib-es5/bin.js -t node6-linux-x64 .

View File

@ -51,7 +51,7 @@ add_library(testcore SHARED testconnections.cpp mariadb_nodes.cpp
sql_t1.cpp test_binlog_fnc.cpp get_my_ip.cpp big_load.cpp get_com_select_insert.cpp
different_size.cpp fw_copy_rules maxinfo_func.cpp config_operations.cpp rds_vpc.cpp execute_cmd.cpp
blob_test.cpp)
target_link_libraries(testcore ${MYSQL_CLIENT} z crypt nsl m pthread ssl crypto dl rt ${CDC_CONNECTOR_LIBRARIES} jansson)
target_link_libraries(testcore ${MYSQL_CLIENT} ${CDC_CONNECTOR_LIBRARIES} ${JANSSON_LIBRARIES} z nsl m pthread ssl dl rt crypto crypt)
install(TARGETS testcore DESTINATION system-test)
add_dependencies(testcore connector-c cdc_connector)

View File

@ -1,21 +0,0 @@
project(cdc_connector)
cmake_minimum_required(VERSION 2.8)
enable_testing()
include(ExternalProject)
include(cmake/BuildJansson.cmake)
set(CMAKE_CXX_FLAGS "-fPIC -std=c++0x")
set(CMAKE_CXX_FLAGS_DEBUG "-fPIC -std=c++0x -ggdb")
set(CMAKE_CXX_FLAGS_RELEASE "-fPIC -std=c++0x -O2")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-fPIC -std=c++0x -O2")
include_directories(${JANSSON_INCLUDE_DIR})
add_library(cdc_connector SHARED cdc_connector.cpp)
target_link_libraries(cdc_connector ${JANSSON_LIBRARIES} crypto)
add_dependencies(cdc_connector jansson)
install(TARGETS cdc_connector DESTINATION lib)
install(FILES cdc_connector.h DESTINATION include)
include(CTest)

View File

@ -1,472 +0,0 @@
#include "cdc_connector.h"
#include <arpa/inet.h>
#include <stdexcept>
#include <unistd.h>
#include <string.h>
#include <sstream>
#include <openssl/sha.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <jansson.h>
#define CDC_CONNECTOR_VERSION "1.0.0"
#define ERRBUF_SIZE 512
#define READBUF_SIZE 1024
static const char OK_RESPONSE[] = "OK\n";
static const char CLOSE_MSG[] = "CLOSE";
static const char REGISTER_MSG[] = "REGISTER UUID=CDC_CONNECTOR-" CDC_CONNECTOR_VERSION ", TYPE=";
static const char REQUEST_MSG[] = "REQUEST-DATA ";
namespace
{
static inline int nointr_read(int fd, void *dest, size_t size)
{
int rc = read(fd, dest, size);
while (rc == -1 && errno == EINTR)
{
rc = read(fd, dest, size);
}
return rc;
}
static inline int nointr_write(int fd, const void *src, size_t size)
{
int rc = write(fd, src, size);
while (rc == -1 && errno == EINTR)
{
rc = write(fd, src, size);
}
return rc;
}
static std::string bin2hex(const uint8_t *data, size_t len)
{
std::string result;
static const char hexconvtab[] = "0123456789abcdef";
for (int i = 0; i < len; i++)
{
result += hexconvtab[data[i] >> 4];
result += hexconvtab[data[i] & 0x0f];
}
return result;
}
std::string generateAuthString(const std::string& user, const std::string& password)
{
uint8_t digest[SHA_DIGEST_LENGTH];
SHA1(reinterpret_cast<const uint8_t*> (password.c_str()), password.length(), digest);
std::string auth_str = user;
auth_str += ":";
std::string part1 = bin2hex((uint8_t*)auth_str.c_str(), auth_str.length());
std::string part2 = bin2hex(digest, sizeof(digest));
return part1 + part2;
}
std::string json_to_string(json_t* json)
{
std::stringstream ss;
switch (json_typeof(json))
{
case JSON_STRING:
ss << json_string_value(json);
break;
case JSON_INTEGER:
ss << json_integer_value(json);
break;
case JSON_REAL:
ss << json_real_value(json);
break;
case JSON_TRUE:
ss << "true";
break;
case JSON_FALSE:
ss << "false";
break;
case JSON_NULL:
break;
default:
break;
}
return ss.str();
}
}
namespace CDC
{
/**
* Public functions
*/
Connection::Connection(const std::string& address,
uint16_t port,
const std::string& user,
const std::string& password,
uint32_t flags) :
m_fd(-1),
m_address(address),
m_port(port),
m_user(user),
m_password(password),
m_flags(flags)
{
}
Connection::~Connection()
{
closeConnection();
}
bool Connection::createConnection()
{
bool rval = false;
struct sockaddr_in remote = {};
remote.sin_port = htons(m_port);
remote.sin_family = AF_INET;
if (inet_aton(m_address.c_str(), (struct in_addr*)&remote.sin_addr.s_addr) == 0)
{
m_error = "Invalid address: ";
m_error += m_address;
}
else
{
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to create socket: ";
m_error += strerror_r(errno, err, sizeof (err));
}
m_fd = fd;
if (connect(fd, (struct sockaddr*) &remote, sizeof (remote)) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to connect: ";
m_error += strerror_r(errno, err, sizeof (err));
}
else if (doAuth())
{
rval = doRegistration();
}
}
return rval;
}
void Connection::closeConnection()
{
if (m_fd != -1)
{
nointr_write(m_fd, CLOSE_MSG, sizeof (CLOSE_MSG) - 1);
close(m_fd);
m_fd = -1;
}
}
bool Connection::requestData(const std::string& table, const std::string& gtid)
{
bool rval = true;
std::string req_msg(REQUEST_MSG);
req_msg += table;
if (gtid.length())
{
req_msg += " ";
req_msg += gtid;
}
if (nointr_write(m_fd, req_msg.c_str(), req_msg.length()) == -1)
{
rval = false;
char err[ERRBUF_SIZE];
m_error = "Failed to write request: ";
m_error += strerror_r(errno, err, sizeof (err));
}
else
{
// Read the first row to know if data request was successful
Row row = read();
if (row)
{
m_first_row = row;
}
else
{
rval = false;
}
}
return rval;
}
static inline bool is_schema(json_t* json)
{
bool rval = false;
json_t* j = json_object_get(json, "fields");
if (j && json_is_array(j) && json_array_size(j))
{
rval = json_object_get(json_array_get(j, 0), "name") != NULL;
}
return rval;
}
void Connection::processSchema(json_t* json)
{
m_keys.clear();
m_types.clear();
json_t* arr = json_object_get(json, "fields");
char* raw = json_dumps(json, 0);
size_t i;
json_t* v;
json_array_foreach(arr, i, v)
{
json_t* name = json_object_get(v, "name");
json_t* type = json_object_get(v, "real_type");
std::string nameval = name ? json_string_value(name) : "";
std::string typeval = type ? json_string_value(type) : "undefined";
m_keys.push_back(nameval);
m_types.push_back(typeval);
}
}
Row Connection::processRow(json_t* js)
{
ValueList values;
m_error.clear();
for (ValueList::iterator it = m_keys.begin();
it != m_keys.end(); it++)
{
json_t* v = json_object_get(js, it->c_str());
if (v)
{
values.push_back(json_to_string(v));
}
else
{
m_error = "No value for key found: ";
m_error += *it;
break;
}
}
Row rval;
if (m_error.empty())
{
rval = Row(new InternalRow(m_keys, m_types, values));
}
return rval;
}
Row Connection::read()
{
Row rval;
std::string row;
if (m_first_row)
{
rval.swap(m_first_row);
}
else if (readRow(row))
{
json_error_t err;
json_t* js = json_loads(row.c_str(), JSON_ALLOW_NUL, &err);
if (js)
{
if (is_schema(js))
{
processSchema(js);
rval = Connection::read();
}
else
{
rval = processRow(js);
}
json_decref(js);
}
else
{
m_error = "Failed to parse JSON: ";
m_error += err.text;
}
}
return rval;
}
/**
* Private functions
*/
bool Connection::doAuth()
{
bool rval = false;
std::string auth_str = generateAuthString(m_user, m_password);
/** Send the auth string */
if (nointr_write(m_fd, auth_str.c_str(), auth_str.length()) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to write authentication data: ";
m_error += strerror_r(errno, err, sizeof (err));
}
else
{
/** Read the response */
char buf[READBUF_SIZE];
int bytes;
if ((bytes = nointr_read(m_fd, buf, sizeof (buf))) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to read authentication response: ";
m_error += strerror_r(errno, err, sizeof (err));
}
else if (memcmp(buf, OK_RESPONSE, sizeof (OK_RESPONSE) - 1) != 0)
{
buf[bytes] = '\0';
m_error = "Authentication failed: ";
m_error += buf;
}
else
{
rval = true;
}
}
return rval;
}
bool Connection::doRegistration()
{
bool rval = false;
std::string reg_msg(REGISTER_MSG);
const char *type = "";
if (m_flags & CDC_REQUEST_TYPE_JSON)
{
type = "JSON";
}
else if (m_flags & CDC_REQUEST_TYPE_AVRO)
{
type = "AVRO";
}
reg_msg += type;
/** Send the registration message */
if (nointr_write(m_fd, reg_msg.c_str(), reg_msg.length()) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to write registration message: ";
m_error += strerror_r(errno, err, sizeof (err));
}
else
{
/** Read the response */
char buf[READBUF_SIZE];
int bytes;
if ((bytes = nointr_read(m_fd, buf, sizeof (buf))) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to read registration response: ";
m_error += strerror_r(errno, err, sizeof (err));
}
else if (memcmp(buf, OK_RESPONSE, sizeof (OK_RESPONSE) - 1) != 0)
{
buf[bytes] = '\0';
m_error = "Registration failed: ";
m_error += buf;
}
else
{
rval = true;
}
}
return rval;
}
bool Connection::readRow(std::string& dest)
{
bool rval = true;
while (true)
{
char buf;
int rc = nointr_read(m_fd, &buf, 1);
if (rc == -1)
{
rval = false;
char err[ERRBUF_SIZE];
m_error = "Failed to read row: ";
m_error += strerror_r(errno, err, sizeof (err));
break;
}
if (buf == '\n')
{
break;
}
else
{
dest += buf;
if (dest[0] == 'E' && dest[1] == 'R' & dest[2] == 'R')
{
m_error = "Server responded with an error: ";
m_error += dest;
rval = false;
break;
}
}
}
return rval;
}
}

View File

@ -1,123 +0,0 @@
#include <cstdint>
#include <string>
#include <tr1/memory>
#include <vector>
#include <algorithm>
#include <jansson.h>
/** Request format flags */
#define CDC_REQUEST_TYPE_JSON (1 << 0)
#define CDC_REQUEST_TYPE_AVRO (1 << 1)
namespace CDC
{
// The typedef for the Row type
class InternalRow;
typedef std::tr1::shared_ptr<InternalRow> Row;
typedef std::vector<std::string> ValueList;
// A class that represents a CDC connection
class Connection
{
public:
Connection(const std::string& address,
uint16_t port,
const std::string& user,
const std::string& password,
uint32_t flags = CDC_REQUEST_TYPE_JSON);
virtual ~Connection();
bool createConnection();
bool requestData(const std::string& table, const std::string& gtid = "");
Row read();
void closeConnection();
const std::string& getSchema() const
{
return m_schema;
}
const std::string& getError() const
{
return m_error;
}
private:
int m_fd;
uint32_t m_flags;
uint16_t m_port;
std::string m_address;
std::string m_user;
std::string m_password;
std::string m_error;
std::string m_schema;
ValueList m_keys;
ValueList m_types;
Row m_first_row;
bool doAuth();
bool doRegistration();
bool readRow(std::string& dest);
void processSchema(json_t* json);
Row processRow(json_t*);
};
// Internal representation of a row, used via the Row type
class InternalRow
{
public:
size_t fieldCount() const
{
return m_values.size();
}
const std::string& value(size_t i) const
{
return m_values[i];
}
const std::string& value(const std::string& str) const
{
ValueList::const_iterator it = std::find(m_keys.begin(), m_keys.end(), str);
return m_values[it - m_keys.begin()];
}
const std::string& key(size_t i) const
{
return m_keys[i];
}
const std::string& type(size_t i) const
{
return m_types[i];
}
~InternalRow()
{
}
private:
ValueList m_keys;
ValueList m_types;
ValueList m_values;
// Not intended to be copied
InternalRow(const InternalRow&);
InternalRow& operator=(const InternalRow&);
InternalRow();
// Only a Connection should construct an InternalRow
friend class Connection;
InternalRow(const ValueList& keys,
const ValueList& types,
const ValueList& values):
m_keys(keys),
m_types(types),
m_values(values)
{
}
};
}

View File

@ -178,7 +178,7 @@ bool run_test(TestConnections& test)
std::string name = type_to_table_name(test_set[x].types[i]);
CDC::Connection conn(test.maxscale_IP, 4001, "skysql", "skysql");
if (conn.createConnection() && conn.requestData(name))
if (conn.connect(name))
{
for (int j = 0; test_set[x].values[j]; j++)
{
@ -198,14 +198,14 @@ bool run_test(TestConnections& test)
}
else
{
std::string err = conn.getError();
std::string err = conn.error();
test.tprintf("Failed to read data: %s", err.c_str());
}
}
}
else
{
std::string err = conn.getError();
std::string err = conn.error();
test.tprintf("Failed to request data: %s", err.c_str());
rval = false;
break;

View File

@ -1,9 +1,9 @@
# If the Jansson library is not found, download it and build it from source.
# Download and build the Jansson library
set(JANSSON_REPO "https://github.com/akheron/jansson.git" CACHE STRING "Jansson Git repository")
set(JANSSON_REPO "https://github.com/akheron/jansson.git" CACHE INTERNAL "Jansson Git repository")
# Release 2.9 of Jansson
set(JANSSON_TAG "v2.9" CACHE STRING "Jansson Git tag")
set(JANSSON_TAG "v2.9" CACHE INTERNAL "Jansson Git tag")
ExternalProject_Add(jansson
GIT_REPOSITORY ${JANSSON_REPO}

View File

@ -94,6 +94,8 @@ add_test_executable_notest(sysbench_example.cpp sysbench_example replication)
set(CONNECTOR_C_VERSION "3.0" CACHE STRING "The Connector-C version to use")
include(ExternalProject)
include(GNUInstallDirs)
ExternalProject_Add(connector-c
GIT_REPOSITORY "https://github.com/MariaDB/mariadb-connector-c.git"
GIT_TAG ${CONNECTOR_C_VERSION}
@ -103,13 +105,19 @@ ExternalProject_Add(connector-c
include_directories(${CMAKE_BINARY_DIR}/include)
set(MYSQL_CLIENT ${CMAKE_BINARY_DIR}/lib/mariadb/libmariadbclient.a CACHE INTERNAL "")
# Build Jansson
include(cmake/BuildJansson.cmake)
include_directories(${JANSSON_INCLUDE_DIR})
# Build the CDC connector
ExternalProject_Add(cdc_connector
SOURCE_DIR ${CMAKE_SOURCE_DIR}/cdc_connector/
CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${CMAKE_BINARY_DIR}/cdc_connector/
GIT_REPOSITORY "https://github.com/mariadb-corporation/maxscale-cdc-connector"
CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${CMAKE_BINARY_DIR}/cdc_connector/ -DJANSSON_INCLUDE_DIR=${JANSSON_INCLUDE_DIR}
BUILD_COMMAND make
INSTALL_COMMAND make install)
INSTALL_COMMAND make install
UPDATE_COMMAND "")
add_dependencies(cdc_connector jansson)
set(CDC_CONNECTOR_INCLUDE ${CMAKE_BINARY_DIR}/cdc_connector/include/ CACHE INTERNAL "")
set(CDC_CONNECTOR_LIBRARIES ${CMAKE_BINARY_DIR}/cdc_connector/lib/libcdc_connector.so CACHE INTERNAL "")
set(CDC_CONNECTOR_LIBRARIES ${CMAKE_BINARY_DIR}/cdc_connector/${CMAKE_INSTALL_LIBDIR}/libcdc_connector.a CACHE INTERNAL "")
include_directories(${CMAKE_BINARY_DIR}/cdc_connector/include)

View File

@ -15,6 +15,7 @@ add_executable(test_semaphore testsemaphore.cc)
add_executable(test_server testserver.cc)
add_executable(test_service testservice.cc)
add_executable(test_spinlock testspinlock.cc)
add_executable(test_thread testthread.cc)
add_executable(test_trxcompare testtrxcompare.cc ../../../query_classifier/test/testreader.cc)
add_executable(test_trxtracking testtrxtracking.cc)
add_executable(test_users testusers.cc)
@ -42,6 +43,7 @@ target_link_libraries(test_semaphore maxscale-common)
target_link_libraries(test_server maxscale-common)
target_link_libraries(test_service maxscale-common)
target_link_libraries(test_spinlock maxscale-common)
target_link_libraries(test_thread maxscale-common)
target_link_libraries(test_trxcompare maxscale-common)
target_link_libraries(test_trxtracking maxscale-common)
target_link_libraries(test_users maxscale-common)
@ -71,6 +73,7 @@ add_test(TestSemaphore test_semaphore)
add_test(TestServer test_server)
add_test(TestService test_service)
add_test(TestSpinlock test_spinlock)
add_test(TestThread test_thread)
add_test(TestUsers test_users)
add_test(TestUtils test_utils)
add_test(TestModulecmd testmodulecmd)

View File

@ -0,0 +1,100 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <stdlib.h>
#include <iostream>
#include <vector>
#include <maxscale/thread.hh>
// We want asserts in release mode as well.
#if !defined(SS_DEBUG)
#define SS_DEBUG
#endif
#include <maxscale/debug.h>
using std::cout;
using std::endl;
using std::vector;
int function(int i)
{
return i / 2;
}
void test_basics()
{
cout << __func__ << endl;
mxs::packaged_task<int, int> t1;
ss_dassert(!t1.valid());
mxs::packaged_task<int, int> t2(function);
ss_dassert(t2.valid());
t1 = t2; // Move task.
ss_dassert(t1.valid());
ss_dassert(!t2.valid());
mxs::future<int> f1;
ss_dassert(!f1.valid());
mxs::future<int> f2 = t1.get_future();
ss_dassert(t1.valid());
ss_dassert(f2.valid());
f1 = f2; // Move future
ss_dassert(f1.valid());
ss_dassert(!f2.valid());
}
void test_running()
{
cout << __func__ << endl;
const int N = 10;
vector<mxs::future<int> > results;
vector<mxs::thread> threads;
cout << "Starting threads" << endl;
for (int i = 0; i < N; ++i)
{
cout << i << endl;
mxs::packaged_task<int, int> task(function);
mxs::future<int> r = task.get_future();
int arg = i;
mxs::thread t(task, arg);
results.push_back(r);
threads.push_back(t);
}
cout << "All threads started." << endl;
cout << "Waiting for threads." << endl;
for (int i = 0; i < N; ++i)
{
cout << i << endl;
threads[i].join();
int got = results[i].get();
int expected = function(i);
ss_dassert(got == expected);
}
}
int main()
{
test_basics();
test_running();
return EXIT_SUCCESS;
}

View File

@ -12,6 +12,7 @@
*/
#include <maxscale/thread.h>
#include <maxscale/thread.hh>
#include <maxscale/log_manager.h>
THREAD *thread_start(THREAD *thd, void (*entry)(void *), void *arg, size_t stack_size)
@ -68,3 +69,116 @@ void thread_millisleep(int ms)
req.tv_nsec = (ms % 1000) * 1000000;
nanosleep(&req, NULL);
}
//
// maxscale::thread
//
namespace maxscale
{
thread::thread()
: m_pInternal(NULL)
{
}
thread::thread(const thread& other)
: m_pInternal(other.m_pInternal)
{
other.m_pInternal = NULL;
}
thread& thread::operator = (const thread& rhs)
{
thread copy(rhs);
copy.swap(*this);
return *this;
}
thread::~thread()
{
ss_dassert(!joinable());
if (joinable())
{
MXS_ERROR("A thread that has not been joined is destructed.");
}
else
{
delete m_pInternal;
}
}
bool thread::joinable() const
{
return m_pInternal ? m_pInternal->joinable() : false;
}
void thread::join()
{
ss_dassert(m_pInternal);
if (!m_pInternal)
{
MXS_ERROR("Attempt to join a non-joinable thread.");
}
else
{
m_pInternal->join();
}
}
void thread::swap(thread& rhs)
{
std::swap(m_pInternal, rhs.m_pInternal);
}
void thread::run()
{
ss_dassert(m_pInternal);
m_pInternal->run();
}
thread::internal::internal(thread::task* pTask)
: m_pTask(pTask)
, m_thread(0)
{
}
thread::internal::~internal()
{
ss_info_dassert(!m_pTask, "Thread not joined before destructed.");
ss_dassert(m_thread == 0);
}
bool thread::internal::joinable() const
{
return m_thread != 0;
}
void thread::internal::join()
{
ss_dassert(joinable());
thread_wait(m_thread);
delete m_pTask;
m_pTask = NULL;
m_thread = 0;
}
void thread::internal::run()
{
if (!thread_start(&m_thread, &thread::internal::main, this, 0))
{
MXS_ALERT("Could not start thread, MaxScale is likely to malfunction.");
}
}
void thread::internal::main()
{
m_pTask->run();
}
void thread::internal::main(void* pArg)
{
static_cast<internal*>(pArg)->main();
}
}

View File

@ -9,6 +9,9 @@ add_library(cachetester
../../../../../query_classifier/test/testreader.cc
)
# Depends on C/C
add_dependencies(cachetester connector-c)
add_executable(testrules testrules.cc ../rules.cc)
target_link_libraries(testrules maxscale-common ${JANSSON_LIBRARIES})

View File

@ -7,6 +7,10 @@ if(BISON_FOUND AND FLEX_FOUND)
include_directories(${CMAKE_CURRENT_BINARY_DIR})
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
add_library(dbfwfilter-core STATIC ${BISON_ruleparser_OUTPUTS} ${FLEX_token_OUTPUTS} rules.cc user.cc)
# Depends on PCRE2
add_dependencies(dbfwfilter-core pcre2 connector-c)
add_library(dbfwfilter SHARED dbfwfilter.cc)
target_link_libraries(dbfwfilter maxscale-common MySQLCommon dbfwfilter-core)
set_target_properties(dbfwfilter PROPERTIES VERSION "1.0.0")

View File

@ -35,15 +35,15 @@
#include "../../../core/maxscale/monitor.h"
/** Column positions for SHOW SLAVE STATUS */
#define MYSQL55_STATUS_BINLOG_POS 5
#define MYSQL55_STATUS_BINLOG_NAME 6
#define MYSQL55_STATUS_MASTER_LOG_POS 5
#define MYSQL55_STATUS_MASTER_LOG_FILE 6
#define MYSQL55_STATUS_IO_RUNNING 10
#define MYSQL55_STATUS_SQL_RUNNING 11
#define MYSQL55_STATUS_MASTER_ID 39
/** Column positions for SHOW SLAVE STATUS */
#define MARIA10_STATUS_BINLOG_NAME 7
#define MARIA10_STATUS_BINLOG_POS 8
#define MARIA10_STATUS_MASTER_LOG_FILE 7
#define MARIA10_STATUS_MASTER_LOG_POS 8
#define MARIA10_STATUS_IO_RUNNING 12
#define MARIA10_STATUS_SQL_RUNNING 13
#define MARIA10_STATUS_MASTER_ID 41
@ -57,8 +57,11 @@
#define SLAVE_HOSTS_PORT 2
using std::string;
static void monitorMain(void *);
typedef std::vector<MXS_MONITORED_SERVER*> ServerVector;
typedef std::vector<string> StringVector;
class MySqlServerInfo;
static void monitorMain(void *);
static void *startMonitor(MXS_MONITOR *, const MXS_CONFIG_PARAMETER*);
static void stopMonitor(MXS_MONITOR *);
static bool stop_monitor(MXS_MONITOR *);
@ -72,8 +75,13 @@ static void set_slave_heartbeat(MXS_MONITOR *, MXS_MONITORED_SERVER *);
static int add_slave_to_master(long *, int, long);
static bool isMySQLEvent(mxs_monitor_event_t event);
void check_maxscale_schema_replication(MXS_MONITOR *monitor);
static bool mon_process_failover(MYSQL_MONITOR* monitor, const char* failover_script, uint32_t failover_timeout);
static bool mon_process_failover(MYSQL_MONITOR* monitor,
const char* failover_script,
uint32_t failover_timeout);
static bool do_failover(MYSQL_MONITOR* mon);
static void update_gtid_slave_pos(MXS_MONITORED_SERVER *database, int64_t domain, MySqlServerInfo* info);
static bool update_replication_settings(MXS_MONITORED_SERVER *database, MySqlServerInfo* info);
static bool report_version_err = true;
static const char* hb_table_name = "maxscale_schema.replication_heartbeat";
@ -101,8 +109,6 @@ static const char CN_REPLICATION_PASSWORD[] = "replication_password";
/** Default master failure verification timeout */
#define DEFAULT_MASTER_FAILURE_TIMEOUT "10"
typedef std::vector<MXS_MONITORED_SERVER*> ServerVector;
// TODO: Specify the real default failover script.
static const char DEFAULT_FAILOVER_SCRIPT[] =
"/usr/bin/echo INITIATOR=$INITIATOR "
@ -572,20 +578,89 @@ MXS_MODULE* MXS_CREATE_MODULE()
}
class Gtid
{
public:
uint32_t domain;
uint32_t server_id;
uint64_t sequence;
Gtid()
: domain(0)
, server_id(0)
, sequence(0)
{}
/**
* Parse a Gtid-triplet from a string. In case of a multi-triplet value, only the triplet with the given domain
* is returned.
*
* @param str Gtid string
* @param search_domain The Gtid domain whose triplet should be returned. Negative domain means autoselect,
* which is only allowed when the string contains one triplet.
*/
Gtid(const char* str, int64_t search_domain = -1)
: domain(0)
, server_id(0)
, sequence(0)
{
// Autoselect only allowed with one triplet
ss_dassert(search_domain >= 0 || strchr(str, ',') == NULL);
parse_triplet(str);
if (search_domain >= 0 && domain != search_domain)
{
// Search for the correct triplet.
bool found = false;
for (const char* next_triplet = strchr(str, ',');
next_triplet != NULL && !found;
next_triplet = strchr(next_triplet, ','))
{
parse_triplet(++next_triplet);
if (domain == search_domain)
{
found = true;
}
}
ss_dassert(found);
}
}
private:
void parse_triplet(const char* str)
{
ss_debug(int rv =) sscanf(str, "%" PRIu32 "-%" PRIu32 "-%" PRIu64, &domain, &server_id, &sequence);
ss_dassert(rv == 3);
}
};
// Contains data returned by one row of SHOW ALL SLAVES STATUS
class SlaveStatusInfo
{
public:
int master_id; /**< Master server id */
bool slave_io; /**< If Slave IO thread is running */
bool slave_sql; /**< If Slave SQL thread is running */
uint64_t binlog_pos; /**< Binlog position */
string binlog_name; /**< Binlog name */
int master_server_id; /**< The master's server_id value. */
bool slave_io_running; /**< Whether the slave I/O thread is running and connected. */
bool slave_sql_running; /**< Whether or not the SQL thread is running. */
string master_log_file; /**< Name of the master binary log file that the I/O thread is currently reading from. */
uint64_t read_master_log_pos; /**< Position up to which the I/O thread has read in the current master
* binary log file. */
Gtid gtid_io_pos; /**< Gtid I/O position of the slave thread. */
SlaveStatusInfo()
: master_id(0),
slave_io(false),
slave_sql(false),
binlog_pos(0)
: master_server_id(0),
slave_io_running(false),
slave_sql_running(false),
read_master_log_pos(0)
{}
};
// This class groups some miscellaneous replication related settings together.
class ReplicationSettings
{
public:
bool gtid_strict_mode; /**< Enable additional checks for replication */
bool log_bin; /**< Is binary logging enabled */
bool log_slave_updates;/**< Does the slave log replicated events to binlog */
ReplicationSettings()
: gtid_strict_mode(false)
, log_bin(false)
, log_slave_updates(false)
{}
};
@ -604,16 +679,13 @@ public:
bool binlog_relay; /** Server is a Binlog Relay */
int n_slaves_configured; /**< Number of configured slave connections*/
int n_slaves_running; /**< Number of running slave connections */
int slave_heartbeats; /**< Number of received heartbeats*/
int slave_heartbeats; /**< Number of received heartbeats */
double heartbeat_period; /**< The time interval between heartbeats */
time_t latest_event; /**< Time when latest event was received from the master */
struct
{
uint32_t domain;
uint32_t server_id;
uint64_t sequence;
} slave_gtid;
Gtid gtid_slave_pos; /**< Gtid of latest replicated event. Only shows the triplet with the
* same domain as Gtid_IO_Pos. */
SlaveStatusInfo slave_status; /**< Data returned from SHOW SLAVE STATUS */
ReplicationSettings rpl_settings; /**< Miscellaneous replication related settings */
MySqlServerInfo()
: server_id(0),
@ -625,9 +697,18 @@ public:
n_slaves_running(0),
slave_heartbeats(0),
heartbeat_period(0),
latest_event(0),
slave_gtid({0, 0, 0})
latest_event(0)
{}
/**
* Calculate how many events are left in relay log.
*
* @return Number of events in relay log according to latest queried info.
*/
int64_t relay_log_events()
{
return slave_status.gtid_io_pos.sequence - gtid_slave_pos.sequence;
}
};
void* info_copy_func(const void *val)
@ -740,7 +821,8 @@ startMonitor(MXS_MONITOR *monitor, const MXS_CONFIG_PARAMETER* params)
else
{
handle = (MYSQL_MONITOR *) MXS_MALLOC(sizeof(MYSQL_MONITOR));
HASHTABLE *server_info = hashtable_alloc(MAX_NUM_SLAVES, hashtable_item_strhash, hashtable_item_strcmp);
HASHTABLE *server_info = hashtable_alloc(MAX_NUM_SLAVES,
hashtable_item_strhash, hashtable_item_strcmp);
if (handle == NULL || server_info == NULL)
{
@ -892,11 +974,11 @@ static void diagnostics(DCB *dcb, const MXS_MONITOR *mon)
dcb_printf(dcb, "Server ID: %d\n", serv_info->server_id);
dcb_printf(dcb, "Read only: %s\n", serv_info->read_only ? "ON" : "OFF");
dcb_printf(dcb, "Slave configured: %s\n", serv_info->slave_configured ? "YES" : "NO");
dcb_printf(dcb, "Slave IO running: %s\n", serv_info->slave_status.slave_io ? "YES" : "NO");
dcb_printf(dcb, "Slave SQL running: %s\n", serv_info->slave_status.slave_sql ? "YES" : "NO");
dcb_printf(dcb, "Master ID: %d\n", serv_info->slave_status.master_id);
dcb_printf(dcb, "Master binlog file: %s\n", serv_info->slave_status.binlog_name.c_str());
dcb_printf(dcb, "Master binlog position: %lu\n", serv_info->slave_status.binlog_pos);
dcb_printf(dcb, "Slave IO running: %s\n", serv_info->slave_status.slave_io_running ? "YES" : "NO");
dcb_printf(dcb, "Slave SQL running: %s\n", serv_info->slave_status.slave_sql_running ? "YES" : "NO");
dcb_printf(dcb, "Master ID: %d\n", serv_info->slave_status.master_server_id);
dcb_printf(dcb, "Master binlog file: %s\n", serv_info->slave_status.master_log_file.c_str());
dcb_printf(dcb, "Master binlog position: %lu\n", serv_info->slave_status.read_master_log_pos);
if (handle->multimaster)
{
@ -951,15 +1033,19 @@ static json_t* diagnostics_json(const MXS_MONITOR *mon)
MySqlServerInfo *serv_info = get_server_info(handle, db);
json_object_set_new(srv, "name", json_string(db->server->unique_name));
json_object_set_new(srv, "server_id", json_integer(serv_info->server_id));
json_object_set_new(srv, "master_id", json_integer(serv_info->slave_status.master_id));
json_object_set_new(srv, "master_id", json_integer(serv_info->slave_status.master_server_id));
json_object_set_new(srv, "read_only", json_boolean(serv_info->read_only));
json_object_set_new(srv, "slave_configured", json_boolean(serv_info->slave_configured));
json_object_set_new(srv, "slave_io_running", json_boolean(serv_info->slave_status.slave_io));
json_object_set_new(srv, "slave_sql_running", json_boolean(serv_info->slave_status.slave_sql));
json_object_set_new(srv, "slave_io_running",
json_boolean(serv_info->slave_status.slave_io_running));
json_object_set_new(srv, "slave_sql_running",
json_boolean(serv_info->slave_status.slave_sql_running));
json_object_set_new(srv, "master_binlog_file", json_string(serv_info->slave_status.binlog_name.c_str()));
json_object_set_new(srv, "master_binlog_position", json_integer(serv_info->slave_status.binlog_pos));
json_object_set_new(srv, "master_binlog_file",
json_string(serv_info->slave_status.master_log_file.c_str()));
json_object_set_new(srv, "master_binlog_position",
json_integer(serv_info->slave_status.read_master_log_pos));
if (handle->multimaster)
{
@ -998,43 +1084,37 @@ static enum mysql_server_version get_server_version(MXS_MONITORED_SERVER* db)
return MYSQL_SERVER_VERSION_51;
}
static void extract_slave_gtid(MySqlServerInfo* info, const char* str)
{
sscanf(str, "%" PRIu32 "-%" PRIu32 "-%" PRIu64, &info->slave_gtid.domain,
&info->slave_gtid.server_id, &info->slave_gtid.sequence);
}
static bool do_show_slave_status(MySqlServerInfo* serv_info, MXS_MONITORED_SERVER* database,
enum mysql_server_version server_version)
{
bool rval = true;
unsigned int columns;
int i_io_thread, i_sql_thread, i_binlog_pos, i_master_id, i_binlog_name;
int i_slave_io_running, i_slave_sql_running, i_read_master_log_pos, i_master_server_id, i_master_log_file;
const char *query;
if (server_version == MYSQL_SERVER_VERSION_100)
{
columns = 42;
query = "SHOW ALL SLAVES STATUS";
i_io_thread = MARIA10_STATUS_IO_RUNNING;
i_sql_thread = MARIA10_STATUS_SQL_RUNNING;
i_binlog_name = MARIA10_STATUS_BINLOG_NAME;
i_binlog_pos = MARIA10_STATUS_BINLOG_POS;
i_master_id = MARIA10_STATUS_MASTER_ID;
i_slave_io_running = MARIA10_STATUS_IO_RUNNING;
i_slave_sql_running = MARIA10_STATUS_SQL_RUNNING;
i_master_log_file = MARIA10_STATUS_MASTER_LOG_FILE;
i_read_master_log_pos = MARIA10_STATUS_MASTER_LOG_POS;
i_master_server_id = MARIA10_STATUS_MASTER_ID;
}
else
{
columns = server_version == MYSQL_SERVER_VERSION_55 ? 40 : 38;
query = "SHOW SLAVE STATUS";
i_io_thread = MYSQL55_STATUS_IO_RUNNING;
i_sql_thread = MYSQL55_STATUS_SQL_RUNNING;
i_binlog_name = MYSQL55_STATUS_BINLOG_NAME;
i_binlog_pos = MYSQL55_STATUS_BINLOG_POS;
i_master_id = MYSQL55_STATUS_MASTER_ID;
i_slave_io_running = MYSQL55_STATUS_IO_RUNNING;
i_slave_sql_running = MYSQL55_STATUS_SQL_RUNNING;
i_master_log_file = MYSQL55_STATUS_MASTER_LOG_FILE;
i_read_master_log_pos = MYSQL55_STATUS_MASTER_LOG_POS;
i_master_server_id = MYSQL55_STATUS_MASTER_ID;
}
MYSQL_RES* result;
int master_id = -1;
int master_server_id = -1;
int nconfigured = 0;
int nrunning = 0;
@ -1058,25 +1138,25 @@ static bool do_show_slave_status(MySqlServerInfo* serv_info, MXS_MONITORED_SERVE
do
{
/* get Slave_IO_Running and Slave_SQL_Running values*/
serv_info->slave_status.slave_io = strncmp(row[i_io_thread], "Yes", 3) == 0;
serv_info->slave_status.slave_sql = strncmp(row[i_sql_thread], "Yes", 3) == 0;
serv_info->slave_status.slave_io_running = strncmp(row[i_slave_io_running], "Yes", 3) == 0;
serv_info->slave_status.slave_sql_running = strncmp(row[i_slave_sql_running], "Yes", 3) == 0;
if (serv_info->slave_status.slave_io && serv_info->slave_status.slave_sql)
if (serv_info->slave_status.slave_io_running && serv_info->slave_status.slave_sql_running)
{
if (nrunning == 0)
{
/** Only check binlog name for the first running slave */
uint64_t binlog_pos = atol(row[i_binlog_pos]);
char* binlog_name = row[i_binlog_name];
if (serv_info->slave_status.binlog_name != binlog_name ||
binlog_pos != serv_info->slave_status.binlog_pos)
uint64_t read_master_log_pos = atol(row[i_read_master_log_pos]);
char* master_log_file = row[i_master_log_file];
if (serv_info->slave_status.master_log_file != master_log_file ||
read_master_log_pos != serv_info->slave_status.read_master_log_pos)
{
// IO thread is reading events from the master
serv_info->latest_event = time(NULL);
}
serv_info->slave_status.binlog_name = binlog_name;
serv_info->slave_status.binlog_pos = binlog_pos;
serv_info->slave_status.master_log_file = master_log_file;
serv_info->slave_status.read_master_log_pos = read_master_log_pos;
}
nrunning++;
@ -1087,13 +1167,13 @@ static bool do_show_slave_status(MySqlServerInfo* serv_info, MXS_MONITORED_SERVE
* root master server.
* Please note, there could be no slaves at all if Slave_SQL_Running == 'No'
*/
if (serv_info->slave_status.slave_io && server_version != MYSQL_SERVER_VERSION_51)
if (serv_info->slave_status.slave_io_running && server_version != MYSQL_SERVER_VERSION_51)
{
/* Get Master_Server_Id */
master_id = atoi(row[i_master_id]);
if (master_id == 0)
master_server_id = atoi(row[i_master_server_id]);
if (master_server_id == 0)
{
master_id = -1;
master_server_id = -1;
}
}
@ -1101,7 +1181,7 @@ static bool do_show_slave_status(MySqlServerInfo* serv_info, MXS_MONITORED_SERVE
{
const char* beats = mxs_mysql_get_value(result, row, "Slave_received_heartbeats");
const char* period = mxs_mysql_get_value(result, row, "Slave_heartbeat_period");
const char* gtid = mxs_mysql_get_value(result, row, "Gtid_Slave_Pos");
const char* gtid_io_pos = mxs_mysql_get_value(result, row, "Gtid_IO_Pos");
ss_dassert(beats && period);
int heartbeats = atoi(beats);
@ -1112,9 +1192,16 @@ static bool do_show_slave_status(MySqlServerInfo* serv_info, MXS_MONITORED_SERVE
serv_info->heartbeat_period = atof(period);
}
if (gtid)
if (serv_info->slave_status.slave_sql_running && gtid_io_pos)
{
extract_slave_gtid(serv_info, gtid);
Gtid io_pos = Gtid(gtid_io_pos);
serv_info->slave_status.gtid_io_pos = io_pos;
update_gtid_slave_pos(database, io_pos.domain, serv_info);
}
else
{
serv_info->slave_status.gtid_io_pos = Gtid();
serv_info->gtid_slave_pos = Gtid();
}
}
@ -1127,15 +1214,12 @@ static bool do_show_slave_status(MySqlServerInfo* serv_info, MXS_MONITORED_SERVE
{
/** Query returned no rows, replication is not configured */
serv_info->slave_configured = false;
serv_info->slave_status.slave_io = false;
serv_info->slave_status.slave_sql = false;
serv_info->slave_status.binlog_pos = 0;
serv_info->slave_status.binlog_name = "";
serv_info->slave_heartbeats = 0;
serv_info->slave_gtid = {};
serv_info->gtid_slave_pos = Gtid();
serv_info->slave_status = SlaveStatusInfo();
}
serv_info->slave_status.master_id = master_id;
serv_info->slave_status.master_server_id = master_server_id;
mysql_free_result(result);
}
else
@ -1177,7 +1261,8 @@ static bool master_still_alive(MYSQL_MONITOR* handle)
{
MySqlServerInfo* info = get_server_info(handle, s);
if (info->slave_configured && info->slave_status.master_id == handle->master->server->node_id &&
if (info->slave_configured &&
info->slave_status.master_server_id == handle->master->server->node_id &&
difftime(time(NULL), info->latest_event) < handle->master_failure_timeout)
{
/**
@ -1211,7 +1296,7 @@ static inline void monitor_mysql_db(MXS_MONITORED_SERVER* database, MySqlServerI
}
/** Store master_id of current node. For MySQL 5.1 it will be set at a later point. */
database->server->master_id = serv_info->slave_status.master_id;
database->server->master_id = serv_info->slave_status.master_server_id;
}
}
@ -1630,12 +1715,12 @@ void find_graph_cycles(MYSQL_MONITOR *handle, MXS_MONITORED_SERVER *database, in
/** Build the graph */
for (int i = 0; i < nservers; i++)
{
if (graph[i].info->slave_status.master_id > 0)
if (graph[i].info->slave_status.master_server_id > 0)
{
/** Found a connected node */
for (int k = 0; k < nservers; k++)
{
if (graph[k].info->server_id == graph[i].info->slave_status.master_id)
if (graph[k].info->server_id == graph[i].info->slave_status.master_server_id)
{
graph[i].parent = &graph[k];
break;
@ -2322,7 +2407,8 @@ static void set_master_heartbeat(MYSQL_MONITOR *handle, MXS_MONITORED_SERVER *da
database->server->node_ts = heartbeat;
sprintf(heartbeat_insert_query,
"UPDATE maxscale_schema.replication_heartbeat SET master_timestamp = %lu WHERE master_server_id = %li AND maxscale_id = %lu",
"UPDATE maxscale_schema.replication_heartbeat "
"SET master_timestamp = %lu WHERE master_server_id = %li AND maxscale_id = %lu",
heartbeat, handle->master->server->node_id, id);
/* Try to insert MaxScale timestamp into master */
@ -2341,7 +2427,8 @@ static void set_master_heartbeat(MYSQL_MONITOR *handle, MXS_MONITORED_SERVER *da
{
heartbeat = time(0);
sprintf(heartbeat_insert_query,
"REPLACE INTO maxscale_schema.replication_heartbeat (master_server_id, maxscale_id, master_timestamp ) VALUES ( %li, %lu, %lu)",
"REPLACE INTO maxscale_schema.replication_heartbeat "
"(master_server_id, maxscale_id, master_timestamp ) VALUES ( %li, %lu, %lu)",
handle->master->server->node_id, id, heartbeat);
if (mxs_mysql_query(database->con, heartbeat_insert_query))
@ -2420,8 +2507,8 @@ static void set_slave_heartbeat(MXS_MONITOR* mon, MXS_MONITORED_SERVER *database
heartbeat = time(0);
slave_read = strtoul(row[0], NULL, 10);
if ((errno == ERANGE && (slave_read == LONG_MAX || slave_read == LONG_MIN)) || (errno != 0 &&
slave_read == 0))
if ((errno == ERANGE && (slave_read == LONG_MAX || slave_read == LONG_MIN)) ||
(errno != 0 && slave_read == 0))
{
slave_read = 0;
}
@ -2951,7 +3038,8 @@ bool mon_process_failover(MYSQL_MONITOR* monitor, const char* failover_script, u
if (failed_master)
{
MXS_NOTICE("Performing automatic failover to replace failed master '%s'.", failed_master->server->unique_name);
MXS_NOTICE("Performing automatic failover to replace failed master '%s'.",
failed_master->server->unique_name);
rval = do_failover(monitor);
}
@ -2967,33 +3055,67 @@ bool mon_process_failover(MYSQL_MONITOR* monitor, const char* failover_script, u
*/
MXS_MONITORED_SERVER* failover_select_new_master(MYSQL_MONITOR* mon, ServerVector* out_slaves)
{
// Select a new master candidate. Currently does not properly wait for relay logs to clear. Requires that
// "detect_stale_slave" is on.
/* Select a new master candidate. Selects the one with the latest event in relay log.
* If multiple slaves have same number of events, select the one with most processed events. */
MXS_MONITORED_SERVER* new_master = NULL;
MySqlServerInfo* new_master_info = NULL;
int master_vector_index = -1;
for (MXS_MONITORED_SERVER *mon_server = mon->monitor->monitored_servers; mon_server; mon_server = mon_server->next)
for (MXS_MONITORED_SERVER *mon_server = mon->monitor->monitored_servers;
mon_server;
mon_server = mon_server->next)
{
MySqlServerInfo* cand_info = get_server_info(mon, mon_server);
if (cand_info->slave_status.slave_sql) // Assumed to be a valid slave.
if (cand_info->slave_status.slave_sql_running && update_replication_settings(mon_server, cand_info))
{
if (out_slaves)
{
out_slaves->push_back(mon_server);
}
bool set_master = false;
// Accept any candidate at this point.
if (cand_info->rpl_settings.log_bin == false)
{
MXS_WARNING("Failover: Slave '%s' has binary log disabled and is not a valid promotion "
"candidate.", mon_server->server->unique_name);
continue;
}
if (cand_info->rpl_settings.gtid_strict_mode == false)
{
MXS_WARNING("Failover: Slave '%s' has gtid_strict_mode disabled. Enabling this setting is "
"recommended. For more information, see "
"https://mariadb.com/kb/en/library/gtid/#gtid_strict_mode",
mon_server->server->unique_name);
}
if (cand_info->rpl_settings.log_slave_updates == false)
{
MXS_WARNING("Failover: Slave '%s' has log_slave_updates disabled. It is a valid candidate "
"but replication will break for lagging slaves if '%s' is promoted.",
mon_server->server->unique_name, mon_server->server->unique_name);
}
bool select_this = false;
// If no candidate yet, accept any.
if (new_master == NULL)
{
set_master = true;
select_this = true;
}
// TODO: Add more checks here, this may give wrong result if filenames are different
else if (cand_info->slave_status.binlog_pos > new_master_info->slave_status.binlog_pos)
else
{
set_master = true;
uint64_t cand_io = cand_info->slave_status.gtid_io_pos.sequence;
uint64_t cand_processed = cand_info->gtid_slave_pos.sequence;
uint64_t master_io = new_master_info->slave_status.gtid_io_pos.sequence;
uint64_t master_processed = new_master_info->gtid_slave_pos.sequence;
bool cand_updates = cand_info->rpl_settings.log_slave_updates;
bool master_updates = new_master_info->rpl_settings.log_slave_updates;
// Otherwise accept a slave with a later event in relay log.
if (cand_io > master_io ||
// If io sequences are identical, the slave with more events processed wins.
(cand_io == master_io && (cand_processed > master_processed ||
// Finally, if binlog positions are identical, prefer a slave with log_slave_updates.
(cand_processed == master_processed && cand_updates && !master_updates))))
{
select_this = true;
}
}
if (set_master)
if (select_this)
{
new_master = mon_server;
new_master_info = cand_info;
@ -3015,6 +3137,44 @@ MXS_MONITORED_SERVER* failover_select_new_master(MYSQL_MONITOR* mon, ServerVecto
return new_master;
}
/**
* Waits until the new master has processed all its relay log, or time is up.
*
* @param mon The monitor
* @param new_master The new master
* @return True if relay log was processed within time limit, or false if time ran out or an error occurred.
*/
bool failover_wait_relay_log(MYSQL_MONITOR* mon, MXS_MONITORED_SERVER* new_master)
{
MySqlServerInfo* master_info = get_server_info(mon, new_master);
time_t begin = time(NULL);
bool query_ok = true;
while (master_info->relay_log_events() > 0 &&
query_ok &&
difftime(time(NULL), begin) < mon->failover_timeout)
{
MXS_NOTICE("Failover: Relay log of server '%s' not yet empty, waiting to clear %" PRId64 " events.",
new_master->server->unique_name, master_info->relay_log_events());
thread_millisleep(1000); // Sleep for a while before querying server again.
// Todo: check server version before entering failover.
query_ok = do_show_slave_status(master_info, new_master, MYSQL_SERVER_VERSION_100);
}
bool rval = false;
if (master_info->relay_log_events() == 0)
{
rval = true;
}
else
{
MXS_ALERT("Failover: %s while waiting for server '%s' to process relay log.",
query_ok ? "Timeout" : "Status query error",
new_master->server->unique_name);
rval = false;
}
return rval;
}
/**
* Prepares a server for the replication master role.
*
@ -3092,7 +3252,7 @@ bool failover_redirect_slaves(MYSQL_MONITOR* mon, ServerVector& slaves, MXS_MONI
* @param mon Server cluster monitor
* @return True if successful
*/
bool do_failover(MYSQL_MONITOR* mon)
static bool do_failover(MYSQL_MONITOR* mon)
{
// Topology has already been tested to be simple.
// Step 1: Select new master. Also populate a vector with all slaves not the selected master.
@ -3100,16 +3260,101 @@ bool do_failover(MYSQL_MONITOR* mon)
MXS_MONITORED_SERVER* new_master = failover_select_new_master(mon, &redirect_slaves);
if (new_master == NULL)
{
MXS_ERROR("Failover: No suitable promotion candidates found, cancelling.");
MXS_ERROR("Failover: No suitable promotion candidate found, cancelling.");
return false;
}
bool rval = false;
// Step 2: Stop and reset slave, set read-only to 0.
if (failover_promote_new_master(mon, new_master))
// Step 2: Wait until relay log consumed.
if (failover_wait_relay_log(mon, new_master) &&
// Step 3: Stop and reset slave, set read-only to 0.
failover_promote_new_master(mon, new_master) &&
// Step 4: Redirect slaves.
failover_redirect_slaves(mon, redirect_slaves, new_master))
{
// Step 3: Redirect slaves.
rval = failover_redirect_slaves(mon, redirect_slaves, new_master);
return true;
}
return false;
}
/**
* Query one row of results, save strings to array. Any additional rows are ignored.
*
* @param database The database to query.
* @param query The query to execute.
* @param expected_cols How many columns the result should have.
* @param output The output array to populate.
* @return True on success.
*/
static bool query_one_row(MXS_MONITORED_SERVER *database, const char* query, unsigned int expected_cols,
StringVector* output)
{
bool rval = false;
MYSQL_RES *result;
if (mxs_mysql_query(database->con, query) == 0 && (result = mysql_store_result(database->con)) != NULL)
{
unsigned int columns = mysql_field_count(database->con);
if (columns != expected_cols)
{
mysql_free_result(result);
MXS_ERROR("Unexpected result for '%s'. Expected %d columns, got %d. MySQL Version: %s",
query, expected_cols, columns, database->server->version_string);
}
else
{
MYSQL_ROW row = mysql_fetch_row(result);
if (row)
{
for (unsigned int i = 0; i < columns; i++)
{
output->push_back((row[i] != NULL) ? row[i] : "");
}
rval = true;
}
else
{
MXS_ERROR("Query '%s' returned no rows.", query);
}
mysql_free_result(result);
}
}
else
{
mon_report_query_error(database);
}
return rval;
}
/**
* Query a few miscellaneous replication settings.
*
* @param database The slave server to query
* @param info Where to save results
* @return True on success
*/
static bool update_replication_settings(MXS_MONITORED_SERVER *database, MySqlServerInfo* info)
{
StringVector row;
bool ok = query_one_row(database, "SELECT @@gtid_strict_mode, @@log_bin, @@log_slave_updates;", 3, &row);
if (ok)
{
info->rpl_settings.gtid_strict_mode = (row[0] == "1");
info->rpl_settings.log_bin = (row[1] == "1");
info->rpl_settings.log_slave_updates = (row[2] == "1");
}
return ok;
}
/**
* Query gtid_slave_pos and save it to the server info object.
*
* @param database The server to query.
* @param domain Which gtid domain should be saved.
* @param info Server info structure for saving result.
*/
static void update_gtid_slave_pos(MXS_MONITORED_SERVER *database, int64_t domain, MySqlServerInfo* info)
{
StringVector row;
if (query_one_row(database, "SELECT @@gtid_slave_pos;", 1, &row))
{
info->gtid_slave_pos = Gtid(row.front().c_str(), domain);
}
}