Merge branch 'develop' into binlog_server_wait_data

This commit is contained in:
MassimilianoPinto
2016-09-15 14:51:00 +02:00
30 changed files with 1290 additions and 352 deletions

View File

@ -6,18 +6,8 @@ os: linux
env:
global:
- MARIADB_URL=https://downloads.mariadb.org/interstitial/mariadb-5.5.48/bintar-linux-glibc_214-x86_64/mariadb-5.5.48-linux-glibc_214-x86_64.tar.gz/from/http%3A//mirror.netinch.com/pub/mariadb/
- MARIADB_TAR=mariadb-5.5.48-linux-glibc_214-x86_64.tar.gz
- MARIADB_DIR=mariadb-5.5.48-linux-x86_64
- secure: "kfzqiIq1XhZ89XYsnqFhPKr5UWB+W4fYAYpOYOLgWMmqfjwqQTm1nN/A6TuFmdbTrzB6hLawsxIUrPS+QKs4TI8tTQMRZ8IZV4TIUQVa7SNQljwrKvnSu0fSoqpPrvXxjEjbTlvpo7X5EKCyCB0Xz6NaYVJIvE9bYnwCEAJw30k="
# prepare the environment
before_script:
# get mariadb packages from mariadb.org
- chmod +x .travis/download_mariadb.sh
- .travis/download_mariadb.sh
# actual compilation commands
script:
- chmod +x .travis/build_maxscale.sh

View File

@ -8,16 +8,15 @@
echo TRAVIS_BUILD_DIR: ${TRAVIS_BUILD_DIR}
echo MARIADB_DIR: ${MARIADB_DIR}
mkdir build
cd build
cmake .. -DCMAKE_INSTALL_PREFIX=/usr -DMYSQL_EMBEDDED_INCLUDE_DIR=${TRAVIS_BUILD_DIR}/${MARIADB_DIR}/include/ -DMYSQL_EMBEDDED_LIBRARIES=${TRAVIS_BUILD_DIR}/${MARIADB_DIR}/lib/libmysqld.a -DERRMSG=${TRAVIS_BUILD_DIR}/${MARIADB_DIR}/share/english/errmsg.sys
cmake .. -DCMAKE_INSTALL_PREFIX=/usr -DBUILD_TESTS=Y
make VERBOSE=1
make test
sudo make install
sudo make testcore
sudo ./postinst
maxscale --version

View File

@ -1,12 +0,0 @@
#/bin/sh -f
# these environment variables are set in .travis.yml
# MARIADB_URL=https://downloads.mariadb.org/interstitial/mariadb-5.5.48/bintar-linux-glibc_214-x86_64/mariadb-5.5.48-linux-glibc_214-x86_64.tar.gz/from/http%3A//mirror.netinch.com/pub/mariadb/
# MARIADB_TAR=mariadb-5.5.48-linux-glibc_214-x86_64.tar.gz
# MARIADB_DIR=mariadb-5.5.48-linux-x86_64
# get mariadb
wget --content-disposition ${MARIADB_URL}
# unompress
tar -axf ${MARIADB_TAR}

View File

@ -62,9 +62,9 @@ storage_args=path=/usr/maxscale/cache/rocksdb
Specifies whether any or only fully qualified references are allowed in
queries stored to the cache.
```
allowed_references=[fully-qualified|any]
allowed_references=[qualified|any]
```
The default is `fully-qualified`, which means that only queries where
The default is `qualified`, which means that only queries where
the database name is included in the table name are subject to caching.
```
select col from db.tbl;
@ -87,13 +87,13 @@ The setting can be changed to `any`, provided fully qualified names
are always used or if the names of tables in different databases are
different.
#### `maximum_resultset_size`
#### `max_resultset_size`
Specifies the maximum size a resultset can have, measured in kibibytes,
in order to be stored in the cache. A resultset larger than this, will
not be stored.
```
maximum_resultset_size=64
max_resultset_size=64
```
The default value is TBD.

View File

@ -97,6 +97,32 @@ Enable support for MySQL 5.1 replication monitoring. This is needed if a MySQL s
mysql51_replication=true
```
### `multimaster`
Detect multi-master replication topologies. This feature is disabled by default.
When enabled, the multi-master detection looks for the root master servers in
the replication clusters. These masters can be found by detecting cycles in the
graph created by the servers. When a cycle is detected, it is assigned a master
group ID. Every master in a master group will receive the Master status. The
special group ID 0 is assigned to all servers which are not a part of a
multi-master replication cycle.
If one or more masters in a group has the `@@read_only` system variable set to
`ON`, those servers will receive the Slave status even though they are in the
multi-master group. Slave servers with `@@read_only` disabled will never receive
the master status.
By setting the servers into read-only mode, the user can control which
server receive the master status. To do this:
- Enable `@@read_only` on all servers (preferrably through the configuration file)
- Manually disable `@@read_only` on the server which should be the master
This functionality is similar to the [Multi-Master Monitor](MM-Monitor.md)
functionality. The only difference is that the MySQL monitor will also detect
traditional Master-Slave topologies.
## Example 1 - Monitor script
Here is an example shell script which sends an email to an admin when a server goes down.

View File

@ -16,6 +16,9 @@ if (TARGET_COMPONENT)
list(FIND TARGET_COMPONENT "all" BUILD_ALL)
endif()
# Name of the common core library
set(MAXSCALE_CORE maxscale-common)
#
# Installation functions for MaxScale
#
@ -56,6 +59,8 @@ function(install_module target component)
install(TARGETS ${target} DESTINATION ${MAXSCALE_LIBDIR} COMPONENT "${component}")
endif()
# Make all modules dependent on the core
add_dependencies(${target} ${MAXSCALE_CORE})
endfunction()
# Installation functions for interpreted scripts.

View File

@ -1,6 +1,6 @@
add_library(maxscale-common SHARED adminusers.c alloc.c atomic.c buffer.c config.c dbusers.c dcb.c filter.c externcmd.c gwbitmask.c gwdirs.c gw_utils.c hashtable.c hint.c housekeeper.c listmanager.c load_utils.c log_manager.cc maxscale_pcre2.c memlog.c misc.c mlist.c modutil.c monitor.c queuemanager.c query_classifier.c poll.c random_jkiss.c resultset.c secrets.c server.c service.c session.c slist.c spinlock.c thread.c users.c utils.c skygw_utils.cc statistics.c listener.c gw_ssl.c mysql_utils.c mysql_binlog.c)
target_link_libraries(maxscale-common ${MARIADB_CONNECTOR_LIBRARIES} ${LZMA_LINK_FLAGS} ${PCRE2_LIBRARIES} ${CURL_LIBRARIES} ssl aio pthread crypt dl crypto inih z rt m stdc++)
target_link_libraries(maxscale-common ${MARIADB_CONNECTOR_LIBRARIES} ${LZMA_LINK_FLAGS} ${PCRE2_LIBRARIES} ${CURL_LIBRARIES} ssl pthread crypt dl crypto inih z rt m stdc++)
if(WITH_JEMALLOC)
target_link_libraries(maxscale-common ${JEMALLOC_LIBRARIES})

View File

@ -184,6 +184,7 @@ static char *monitor_params[] =
"available_when_donor",
"disable_master_role_setting",
"use_priority",
"multimaster",
NULL
};

View File

@ -92,6 +92,15 @@
#include <sys/un.h>
#include <maxscale/alloc.h>
#if defined(FAKE_CODE)
unsigned char dcb_fake_write_errno[10240];
__int32_t dcb_fake_write_ev[10240];
bool fail_next_backend_fd;
bool fail_next_client_fd;
int fail_next_accept;
int fail_accept_errno;
#endif /* FAKE_CODE */
/* The list of all DCBs */
static LIST_CONFIG DCBlist =
{LIST_TYPE_RECYCLABLE, sizeof(DCB), SPINLOCK_INIT};

View File

@ -491,88 +491,52 @@ int modutil_send_mysql_err_packet(DCB *dcb,
}
/**
* Buffer contains at least one of the following:
* complete [complete] [partial] mysql packet
* Return the first packet from a buffer.
*
* return pointer to gwbuf containing a complete packet or
* NULL if no complete packet was found.
* @param p_readbuf Pointer to pointer to GWBUF. If the GWBUF contains a
* complete packet, after the call it will have been updated
* to begin at the byte following the packet.
*
* @return Pointer to GWBUF if the buffer contained at least one complete packet,
* otherwise NULL.
*
* @attention The returned GWBUF is not necessarily contiguous.
*/
GWBUF* modutil_get_next_MySQL_packet(GWBUF** p_readbuf)
{
GWBUF* packetbuf;
GWBUF* readbuf;
size_t buflen;
size_t packetlen;
size_t totalbuflen;
uint8_t* data;
size_t nbytes_copied = 0;
uint8_t* target;
GWBUF *packet = NULL;
GWBUF *readbuf = *p_readbuf;
readbuf = *p_readbuf;
if (readbuf)
{
CHK_GWBUF(readbuf);
if (readbuf == NULL)
{
packetbuf = NULL;
goto return_packetbuf;
}
CHK_GWBUF(readbuf);
size_t totalbuflen = gwbuf_length(readbuf);
if (totalbuflen >= MYSQL_HEADER_LEN)
{
size_t packetlen;
if (GWBUF_EMPTY(readbuf))
{
packetbuf = NULL;
goto return_packetbuf;
}
totalbuflen = gwbuf_length(readbuf);
if (totalbuflen < MYSQL_HEADER_LEN)
{
packetbuf = NULL;
goto return_packetbuf;
if (GWBUF_LENGTH(readbuf) >= 3) // The length is in the 3 first bytes.
{
uint8_t *data = (uint8_t *)GWBUF_DATA((readbuf));
packetlen = MYSQL_GET_PACKET_LEN(data) + 4;
}
else
{
// The header is split between two GWBUFs.
uint8_t data[3];
gwbuf_copy_data(readbuf, 0, 3, data);
packetlen = MYSQL_GET_PACKET_LEN(data) + 4;
}
if (packetlen <= totalbuflen)
{
packet = gwbuf_split(p_readbuf, packetlen);
}
}
}
if (GWBUF_LENGTH(readbuf) >= 3) // The length is in the 3 first bytes.
{
data = (uint8_t *)GWBUF_DATA((readbuf));
packetlen = MYSQL_GET_PACKET_LEN(data) + 4;
}
else
{
// The header is split between two GWBUFs.
uint8_t length[3];
gwbuf_copy_data(readbuf, 0, 3, length);
packetlen = MYSQL_GET_PACKET_LEN(length) + 4;
}
/** packet is incomplete */
if (packetlen > totalbuflen)
{
packetbuf = NULL;
goto return_packetbuf;
}
packetbuf = gwbuf_alloc(packetlen);
target = GWBUF_DATA(packetbuf);
packetbuf->gwbuf_type = readbuf->gwbuf_type; /*< Copy the type too */
/**
* Copy first MySQL packet to packetbuf and leave posible other
* packets to read buffer.
*/
while (nbytes_copied < packetlen && totalbuflen > 0)
{
uint8_t* src = GWBUF_DATA((*p_readbuf));
size_t bytestocopy;
buflen = GWBUF_LENGTH((*p_readbuf));
bytestocopy = MIN(buflen, packetlen - nbytes_copied);
memcpy(target + nbytes_copied, src, bytestocopy);
*p_readbuf = gwbuf_consume((*p_readbuf), bytestocopy);
totalbuflen = gwbuf_length((*p_readbuf));
nbytes_copied += bytestocopy;
}
ss_dassert(buflen == 0 || nbytes_copied == packetlen);
return_packetbuf:
return packetbuf;
return packet;
}
/**

View File

@ -619,7 +619,7 @@ server_status(SERVER *server)
{
char *status = NULL;
if (NULL == server || (status = (char *)MXS_MALLOC(256)) == NULL)
if (NULL == server || (status = (char *)MXS_MALLOC(512)) == NULL)
{
return NULL;
}
@ -632,6 +632,10 @@ server_status(SERVER *server)
{
strcat(status, "Master, ");
}
if (server->status & SERVER_RELAY_MASTER)
{
strcat(status, "Relay Master, ");
}
if (server->status & SERVER_SLAVE)
{
strcat(status, "Slave, ");

View File

@ -25,6 +25,10 @@
// To ensure that ss_info_assert asserts also when builing in non-debug mode.
#if !defined(SS_DEBUG)
#define SS_DEBUG
int debug_check_fail = 1;
#else
// This is defined in the queuemanager code but only in debug builds
extern int debug_check_fail;
#endif
#if defined(NDEBUG)
#undef NDEBUG
@ -44,8 +48,6 @@
*
*/
extern int debug_check_fail;
#define TEST_QUEUE_SIZE 5
#define HEARTBEATS_TO_EXPIRE 3
#define NUMBER_OF_THREADS 4

View File

@ -161,6 +161,9 @@ typedef struct gwbuf
/*< Check that the data in a buffer has the SQL marker*/
#define GWBUF_IS_SQL(b) (0x03 == GWBUF_DATA_CHAR(b,4))
/*< Check whether the buffer is contiguous*/
#define GWBUF_IS_CONTIGUOUS(b) (((b) == NULL) || ((b)->next == NULL))
/*< True if all bytes in the buffer have been consumed */
#define GWBUF_EMPTY(b) ((char *)(b)->start >= (char *)(b)->end)

View File

@ -293,12 +293,12 @@ typedef enum
} DCB_USAGE;
#if defined(FAKE_CODE)
unsigned char dcb_fake_write_errno[10240];
__int32_t dcb_fake_write_ev[10240];
bool fail_next_backend_fd;
bool fail_next_client_fd;
int fail_next_accept;
int fail_accept_errno;
extern unsigned char dcb_fake_write_errno[10240];
extern __int32_t dcb_fake_write_ev[10240];
extern bool fail_next_backend_fd;
extern bool fail_next_client_fd;
extern int fail_next_accept;
extern int fail_accept_errno;
#endif /* FAKE_CODE */
/* A few useful macros */

View File

@ -129,6 +129,7 @@ typedef struct server
#define SERVER_MASTER_STICKINESS 0x0100 /**<< Server Master stickiness */
#define SERVER_AUTH_ERROR 0x1000 /**<< Authentication error from monitor */
#define SERVER_STALE_SLAVE 0x2000 /**<< Slave status is possible even without a master */
#define SERVER_RELAY_MASTER 0x4000 /**<< Server is a relay master */
/**
* Is the server running - the macro returns true if the server is marked as running

View File

@ -2,3 +2,5 @@ add_library(cache SHARED cache.c storage.c)
target_link_libraries(cache maxscale-common)
set_target_properties(cache PROPERTIES VERSION "1.0.0")
install_module(cache experimental)
add_subdirectory(storage)

View File

@ -22,7 +22,17 @@
static char VERSION_STRING[] = "V1.0.0";
static const int DEFAULT_TTL = 10;
typedef enum cache_references
{
CACHE_REFERENCES_ANY,
CACHE_REFERENCES_QUALIFIED
} cache_references_t;
#define DEFAULT_ALLOWED_REFERENCES CACHE_REFERENCES_QUALIFIED
// Bytes
#define DEFAULT_MAX_RESULTSET_SIZE 64 * 1024
// Seconds
#define DEFAULT_TTL 10
static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **);
static void *newSession(FILTER *instance, SESSION *session);
@ -88,16 +98,32 @@ FILTER_OBJECT *GetModuleObject()
// Implementation
//
typedef struct cache_config
{
cache_references_t allowed_references;
uint32_t max_resultset_size;
const char *storage;
const char *storage_args;
uint32_t ttl;
} CACHE_CONFIG;
typedef struct cache_instance
{
const char *name;
const char *storage_name;
const char *storage_args;
uint32_t ttl; // Time to live in seconds.
CACHE_STORAGE_MODULE *module;
CACHE_STORAGE *storage;
const char *name;
CACHE_CONFIG config;
CACHE_STORAGE_MODULE *module;
CACHE_STORAGE *storage;
} CACHE_INSTANCE;
static const CACHE_CONFIG DEFAULT_CONFIG =
{
DEFAULT_ALLOWED_REFERENCES,
DEFAULT_MAX_RESULTSET_SIZE,
NULL,
NULL,
DEFAULT_TTL
};
typedef struct cache_session_data
{
CACHE_STORAGE_API *api; /**< The storage API to be used. */
@ -131,9 +157,7 @@ static bool route_using_cache(CACHE_INSTANCE *instance,
*/
static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **params)
{
const char *storage_name = NULL;
const char *storage_args = NULL;
uint32_t ttl = DEFAULT_TTL;
CACHE_CONFIG config = DEFAULT_CONFIG;
bool error = false;
@ -141,13 +165,44 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
{
const FILTER_PARAMETER *param = params[i];
if (strcmp(param->name, "storage_name") == 0)
if (strcmp(param->name, "allowed_references") == 0)
{
storage_name = param->value;
if (strcmp(param->value, "qualified") == 0)
{
config.allowed_references = CACHE_REFERENCES_QUALIFIED;
}
else if (strcmp(param->value, "any") == 0)
{
config.allowed_references = CACHE_REFERENCES_ANY;
}
else
{
MXS_ERROR("Unknown value '%s' for parameter '%s'.", param->value, param->name);
error = true;
}
}
else if (strcmp(param->name, "max_resultset_size") == 0)
{
int v = atoi(param->value);
if (v > 0)
{
config.max_resultset_size = v;
}
else
{
MXS_ERROR("The value of the configuration entry '%s' must "
"be an integer larger than 0.", param->name);
error = true;
}
}
else if (strcmp(param->name, "storage_args") == 0)
{
storage_args = param->value;
config.storage_args = param->value;
}
else if (strcmp(param->name, "storage") == 0)
{
config.storage = param->value;
}
else if (strcmp(param->name, "ttl") == 0)
{
@ -155,12 +210,12 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
if (v > 0)
{
ttl = v;
config.ttl = v;
}
else
{
MXS_ERROR("The value of the configuration entry 'ttl' must "
"be an integer larger than 0.");
MXS_ERROR("The value of the configuration entry '%s' must "
"be an integer larger than 0.", param->name);
error = true;
}
}
@ -177,22 +232,20 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
{
if ((cinstance = MXS_CALLOC(1, sizeof(CACHE_INSTANCE))) != NULL)
{
CACHE_STORAGE_MODULE *module = cache_storage_open(storage_name);
CACHE_STORAGE_MODULE *module = cache_storage_open(config.storage);
if (module)
{
CACHE_STORAGE *storage = module->api->createInstance(name, ttl, 0, NULL);
CACHE_STORAGE *storage = module->api->createInstance(name, config.ttl, 0, NULL);
if (storage)
{
cinstance->name = name;
cinstance->storage_name = storage_name;
cinstance->storage_args = storage_args;
cinstance->ttl = ttl;
cinstance->config = config;
cinstance->module = module;
cinstance->storage = storage;
MXS_NOTICE("Cache storage %s opened and initialized.", storage_name);
MXS_NOTICE("Cache storage %s opened and initialized.", config.storage);
}
else
{

View File

@ -0,0 +1 @@
add_subdirectory(storage_rocksdb)

View File

@ -0,0 +1,49 @@
# Build RocksDB
if ((CMAKE_CXX_COMPILER_ID STREQUAL "GNU") AND (NOT (CMAKE_CXX_COMPILER_VERSION VERSION_LESS 4.7)))
message(STATUS "GCC >= 4.7, RocksDB is built.")
set(ROCKSDB_REPO "https://github.com/facebook/rocksdb.git"
CACHE STRING "RocksDB Git repository")
# Release 4.9 of RocksDB
set(ROCKSDB_TAG "v4.9"
CACHE STRING "RocksDB Git tag")
set(ROCKSDB_SUBPATH "/server/modules/filter/cache/storage/storage_rocksdb/RocksDB-prefix/src/RocksDB")
set(ROCKSDB_ROOT ${CMAKE_BINARY_DIR}${ROCKSDB_SUBPATH})
ExternalProject_Add(RocksDB
GIT_REPOSITORY ${ROCKSDB_REPO}
GIT_TAG ${ROCKSDB_TAG}
CONFIGURE_COMMAND ""
BINARY_DIR ${ROCKSDB_ROOT}
CONFIGURE_COMMAND ""
BUILD_COMMAND make DISABLE_JEMALLOC=1 EXTRA_CXXFLAGS=-fPIC static_lib
INSTALL_COMMAND "")
set(ROCKSDB_BUILT TRUE CACHE INTERNAL "")
set(ROCKSDB_INCLUDE_DIRS ${ROCKSDB_ROOT}/include ${ROCKSDB_ROOT})
set(ROCKSDB_LIB_DIR ${ROCKSDB_ROOT})
set(ROCKSDB_LIB librocksdb.a)
# RocksDB supports several compression libraries and automatically
# uses them if it finds the headers in the environment. Consequently,
# we must ensure that a user of RocksDB can link to the needed
# libraries.
#
# ROCKSDB_LINK_LIBS specifies that libraries a module using ROCKSDB_LIB
# must link with.
find_package(BZip2)
if (BZIP2_FOUND)
set(ROCKSDB_LINK_LIBS ${ROCKSDB_LINK_LIBS} ${BZIP2_LIBRARIES})
endif()
find_package(ZLIB)
if (ZLIB_FOUND)
set(ROCKSDB_LINK_LIBS ${ROCKSDB_LINK_LIBS} ${ZLIB_LIBRARIES})
endif()
else()
message(STATUS "RocksDB requires GCC >= 4.7, only ${CMAKE_CXX_COMPILER_VERSION} available.")
endif()

View File

@ -0,0 +1,26 @@
include(BuildRocksDB.cmake)
if (ROCKSDB_BUILT)
message(STATUS "RocksDB is built, storage_rocksdb will be built.")
set(CMAKE_CXX_FLAGS "-std=c++11 ${CMAKE_CXX_FLAGS} -DROCKSDB_PLATFORM_POSIX")
set(CMAKE_CXX_FLAGS_DEBUG "-std=c++11 ${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_PLATFORM_POSIX")
set(CMAKE_CXX_FLAGS_RELEASE "-std=c++11 ${CMAKE_CXX_FLAGS_RELEASE} -DROCKSDB_PLATFORM_POSIX")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-std=c++11 ${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -DROCKSDB_PLATFORM_POSIX")
include_directories(${ROCKSDB_INCLUDE_DIRS})
link_directories(${ROCKSDB_LIB_DIR})
add_library(storage_rocksdb SHARED
rocksdbinternals.cc
rocksdbstorage.cc
storage_rocksdb.cc
)
add_dependencies(storage_rocksdb RocksDB)
target_link_libraries(storage_rocksdb maxscale-common ${ROCKSDB_LIB} ${ROCKSDB_LINK_LIBS})
set_target_properties(storage_rocksdb PROPERTIES VERSION "1.0.0")
set_target_properties(storage_rocksdb PROPERTIES LINK_FLAGS -Wl,-z,defs)
install_module(storage_rocksdb experimental)
else()
message("RocksDB not built, storage_rocksdb cannot be built.")
endif()

View File

@ -0,0 +1,46 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "rocksdbinternals.h"
#include <rocksdb/env.h>
#include <util/coding.h>
/**
* Check whether a value is stale or not.
*
* @param value A value with the timestamp at the end.
* @param ttl The time-to-live in seconds.
* @param pEnv The used RocksDB environment instance.
*
* @return True of the value is stale.
*
* Basically a copy from RocksDB/utilities/ttl/db_ttl_impl.cc:160
* but note that the here we claim the data is stale if we fail to
* get the time while the original code claims it is fresh.
*/
bool RocksDBInternals::IsStale(const rocksdb::Slice& value, int32_t ttl, rocksdb::Env* pEnv)
{
if (ttl <= 0)
{ // Data is fresh if TTL is non-positive
return false;
}
int64_t curtime;
if (!pEnv->GetCurrentTime(&curtime).ok())
{
return true; // Treat the data as stale if could not get current time
}
int32_t timestamp = rocksdb::DecodeFixed32(value.data() + value.size() - TS_LENGTH);
return (timestamp + ttl) < curtime;
}

View File

@ -0,0 +1,41 @@
#pragma once
#ifndef _ROCKSDBINTERNALS_H
#define _ROCKSDBINTERNALS_H
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "storage_rocksdb.h"
#include <rocksdb/env.h>
#include <rocksdb/version.h>
#include <rocksdb/slice.h>
#if (ROCKSDB_MAJOR != 4) || (ROCKSDB_MINOR != 9)
#error RocksDBStorage was created with knowledge of RocksDB 4.9 internals.\
The version used is something else. Ensure the knowledge is still applicable.
#endif
namespace RocksDBInternals
{
/**
* The length of the timestamp when stashed after the actual value.
*
* See RocksDB/utilities/ttl/db_ttl_impl.h
*/
static const uint32_t TS_LENGTH = sizeof(int32_t);
bool IsStale(const rocksdb::Slice& slice, int32_t ttl, rocksdb::Env* pEnv);
}
#endif

View File

@ -0,0 +1,201 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "rocksdbstorage.h"
#include <openssl/sha.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <rocksdb/env.h>
#include <gwdirs.h>
#include "rocksdbinternals.h"
using std::string;
using std::unique_ptr;
namespace
{
string u_storageDirectory;
const size_t ROCKSDB_KEY_LENGTH = SHA512_DIGEST_LENGTH;
// See https://github.com/facebook/rocksdb/wiki/Basic-Operations#thread-pools
// These figures should perhaps depend upon the number of cache instances.
const size_t ROCKSDB_N_LOW_THREADS = 2;
const size_t ROCKSDB_N_HIGH_THREADS = 1;
}
//private
RocksDBStorage::RocksDBStorage(unique_ptr<rocksdb::DBWithTTL>& sDb,
const string& name,
const string& path,
uint32_t ttl)
: m_sDb(std::move(sDb))
, m_name(name)
, m_path(path)
, m_ttl(ttl)
{
}
RocksDBStorage::~RocksDBStorage()
{
}
//static
bool RocksDBStorage::Initialize()
{
bool initialized = true;
u_storageDirectory = get_cachedir();
u_storageDirectory += "/storage_rocksdb";
if (mkdir(u_storageDirectory.c_str(), S_IRWXU) == 0)
{
MXS_NOTICE("Created storage directory %s.", u_storageDirectory.c_str());
}
else if (errno != EEXIST)
{
initialized = false;
char errbuf[STRERROR_BUFLEN];
MXS_ERROR("Failed to create storage directory %s: %s",
u_storageDirectory.c_str(),
strerror_r(errno, errbuf, sizeof(errbuf)));
}
else
{
auto pEnv = rocksdb::Env::Default();
pEnv->SetBackgroundThreads(ROCKSDB_N_LOW_THREADS, rocksdb::Env::LOW);
pEnv->SetBackgroundThreads(ROCKSDB_N_HIGH_THREADS, rocksdb::Env::HIGH);
}
return initialized;
}
//static
RocksDBStorage* RocksDBStorage::Create(const char* zName, uint32_t ttl, int argc, char* argv[])
{
ss_dassert(zName);
string path(u_storageDirectory);
path += "/";
path += zName;
rocksdb::Options options;
options.env = rocksdb::Env::Default();
options.max_background_compactions = ROCKSDB_N_LOW_THREADS;
options.max_background_flushes = ROCKSDB_N_HIGH_THREADS;
options.create_if_missing = true;
rocksdb::DBWithTTL* pDb;
rocksdb::Status status = rocksdb::DBWithTTL::Open(options, path, &pDb, ttl);
RocksDBStorage* pStorage = nullptr;
if (status.ok())
{
unique_ptr<rocksdb::DBWithTTL> sDb(pDb);
pStorage = new RocksDBStorage(sDb, zName, path, ttl);
}
else
{
MXS_ERROR("Could not open RocksDB database %s using path %s: %s",
zName, path.c_str(), status.ToString().c_str());
}
return pStorage;
}
cache_result_t RocksDBStorage::getKey(const GWBUF* pQuery, char* pKey)
{
// ss_dassert(gwbuf_is_contiguous(pQuery));
const uint8_t* pData = static_cast<const uint8_t*>(GWBUF_DATA(pQuery));
size_t len = MYSQL_GET_PACKET_LEN(pData) - 1; // Subtract 1 for packet type byte.
const uint8_t* pSql = &pData[5]; // Symbolic constant for 5?
memset(pKey, 0, CACHE_KEY_MAXLEN);
SHA512(pSql, len, reinterpret_cast<unsigned char*>(pKey));
return CACHE_RESULT_OK;
}
cache_result_t RocksDBStorage::getValue(const char* pKey, GWBUF** ppResult)
{
// Use the root DB so that we get the value *with* the timestamp at the end.
rocksdb::DB* pDb = m_sDb->GetRootDB();
rocksdb::Slice key(pKey, ROCKSDB_KEY_LENGTH);
string value;
rocksdb::Status status = pDb->Get(rocksdb::ReadOptions(), key, &value);
cache_result_t result = CACHE_RESULT_ERROR;
switch (status.code())
{
case rocksdb::Status::kOk:
if (value.length() >= RocksDBInternals::TS_LENGTH)
{
if (!RocksDBInternals::IsStale(value, m_ttl, rocksdb::Env::Default()))
{
size_t length = value.length() - RocksDBInternals::TS_LENGTH;
*ppResult = gwbuf_alloc(length);
if (*ppResult)
{
memcpy(GWBUF_DATA(*ppResult), value.data(), length);
result = CACHE_RESULT_OK;
}
}
else
{
MXS_NOTICE("Cache item is stale, not using.");
}
}
else
{
MXS_ERROR("RocksDB value too short. Database corrupted?");
result = CACHE_RESULT_ERROR;
}
break;
case rocksdb::Status::kNotFound:
result = CACHE_RESULT_NOT_FOUND;
break;
default:
MXS_ERROR("Failed to look up value: %s", status.ToString().c_str());
}
return result;
}
cache_result_t RocksDBStorage::putValue(const char* pKey, const GWBUF* pValue)
{
// ss_dassert(gwbuf_is_contiguous(pValue));
rocksdb::Slice key(pKey, ROCKSDB_KEY_LENGTH);
rocksdb::Slice value(static_cast<const char*>(GWBUF_DATA(pValue)), GWBUF_LENGTH(pValue));
rocksdb::Status status = m_sDb->Put(rocksdb::WriteOptions(), key, value);
return status.ok() ? CACHE_RESULT_OK : CACHE_RESULT_ERROR;
}

View File

@ -0,0 +1,50 @@
#ifndef _ROCKSDBSTORAGE_H
#define _ROCKSDBSTORAGE_H
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "storage_rocksdb.h"
#include <memory>
#include <string>
#include <rocksdb/utilities/db_ttl.h>
#include "../../cache_storage_api.h"
class RocksDBStorage
{
public:
static bool Initialize();
static RocksDBStorage* Create(const char* zName, uint32_t ttl, int argc, char* argv[]);
~RocksDBStorage();
cache_result_t getKey(const GWBUF* pQuery, char* pKey);
cache_result_t getValue(const char* pKey, GWBUF** ppResult);
cache_result_t putValue(const char* pKey, const GWBUF* pValue);
private:
RocksDBStorage(std::unique_ptr<rocksdb::DBWithTTL>& sDb,
const std::string& name,
const std::string& path,
uint32_t ttl);
RocksDBStorage(const RocksDBStorage&) = delete;
RocksDBStorage& operator = (const RocksDBStorage&) = delete;
private:
std::unique_ptr<rocksdb::DBWithTTL> m_sDb;
std::string m_name;
std::string m_path;
uint32_t m_ttl;
};
#endif

View File

@ -0,0 +1,151 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "storage_rocksdb.h"
#include "../../cache_storage_api.h"
#include "rocksdbstorage.h"
namespace
{
bool initialize()
{
return RocksDBStorage::Initialize();
}
CACHE_STORAGE* createInstance(const char* zName, uint32_t ttl, int argc, char* argv[])
{
CACHE_STORAGE* pStorage = 0;
try
{
pStorage = reinterpret_cast<CACHE_STORAGE*>(RocksDBStorage::Create(zName, ttl, argc, argv));
}
catch (const std::bad_alloc&)
{
MXS_OOM();
}
catch (const std::exception& x)
{
MXS_ERROR("Standard exception caught: %s", x.what());
}
catch (...)
{
MXS_ERROR("Unknown exception caught.");
}
return pStorage;
}
void freeInstance(CACHE_STORAGE* pInstance)
{
delete reinterpret_cast<RocksDBStorage*>(pInstance);
}
cache_result_t getKey(CACHE_STORAGE* pStorage,
const GWBUF* pQuery,
char* pKey)
{
cache_result_t result = CACHE_RESULT_ERROR;
try
{
result = reinterpret_cast<RocksDBStorage*>(pStorage)->getKey(pQuery, pKey);
}
catch (const std::bad_alloc&)
{
MXS_OOM();
}
catch (const std::exception& x)
{
MXS_ERROR("Standard exception caught: %s", x.what());
}
catch (...)
{
MXS_ERROR("Unknown exception caught.");
}
return result;
}
cache_result_t getValue(CACHE_STORAGE* pStorage, const char* pKey, GWBUF** ppResult)
{
cache_result_t result = CACHE_RESULT_ERROR;
try
{
result = reinterpret_cast<RocksDBStorage*>(pStorage)->getValue(pKey, ppResult);
}
catch (const std::bad_alloc&)
{
MXS_OOM();
}
catch (const std::exception& x)
{
MXS_ERROR("Standard exception caught: %s", x.what());
}
catch (...)
{
MXS_ERROR("Unknown exception caught.");
}
return result;
}
cache_result_t putValue(CACHE_STORAGE* pStorage,
const char* pKey,
const GWBUF* pValue)
{
cache_result_t result = CACHE_RESULT_ERROR;
try
{
result = reinterpret_cast<RocksDBStorage*>(pStorage)->putValue(pKey, pValue);
}
catch (const std::bad_alloc&)
{
MXS_OOM();
}
catch (const std::exception& x)
{
MXS_ERROR("Standard exception caught: %s", x.what());
}
catch (...)
{
MXS_ERROR("Unknown exception caught.");
}
return result;
}
}
extern "C"
{
CACHE_STORAGE_API* CacheGetStorageAPI()
{
static CACHE_STORAGE_API api =
{
initialize,
createInstance,
freeInstance,
getKey,
getValue,
putValue
};
return &api;
}
}

View File

@ -0,0 +1,19 @@
#ifndef _STORAGE_ROCKSDB_H
#define _STORAGE_ROCKSDB_H
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#define MXS_MODULE_NAME "storage_rocksdb"
#include <log_manager.h>
#endif

View File

@ -1,4 +1,4 @@
add_library(ccrfilter SHARED ccrfilter.c)
target_link_libraries(ccrfilter maxscale-common)
set_target_properties(ccrfilter PROPERTIES VERSION "1.0.0")
install_module(ccrfilter experimental)
install_module(ccrfilter core)

View File

@ -1,4 +1,4 @@
add_library(gatekeeper SHARED gatekeeper.c)
target_link_libraries(gatekeeper maxscale-common)
set_target_properties(gatekeeper PROPERTIES VERSION "1.0.0")
install_module(gatekeeper experimental)
install_module(gatekeeper core)

View File

@ -28,6 +28,7 @@
#include <modinfo.h>
#include <maxconfig.h>
#include <externcmd.h>
#include <hashtable.h>
/**
* @file mysqlmon.h - The MySQL monitor
@ -62,6 +63,7 @@ typedef struct
int replicationHeartbeat; /**< Monitor flag for MySQL replication heartbeat */
bool detectStaleMaster; /**< Monitor flag for MySQL replication Stale Master detection */
bool detectStaleSlave; /**< Monitor flag for MySQL replication Stale Master detection */
bool multimaster; /**< Detect and handle multi-master topologies */
int disableMasterFailback; /**< Monitor flag for Galera Cluster Master failback */
int availableWhenDonor; /**< Monitor flag for Galera Cluster Donor availability */
int disableMasterRoleSetting; /**< Monitor flag to disable setting master role */
@ -69,6 +71,7 @@ typedef struct
MONITOR_SERVERS *master; /**< Master server for MySQL Master/Slave replication */
char* script; /*< Script to call when state changes occur on servers */
bool events[MAX_MONITOR_EVENT]; /*< enabled events */
HASHTABLE *server_info; /**< Contains server specific information */
} MYSQL_MONITOR;
#endif

View File

@ -50,11 +50,28 @@
#include <modutil.h>
#include <maxscale/alloc.h>
extern char *strcasestr(const char *haystack, const char *needle);
/** Column positions for SHOW SLAVE STATUS */
#define MYSQL55_STATUS_BINLOG_POS 5
#define MYSQL55_STATUS_BINLOG_NAME 6
#define MYSQL55_STATUS_IO_RUNNING 10
#define MYSQL55_STATUS_SQL_RUNNING 11
#define MYSQL55_STATUS_MASTER_ID 39
/** Column positions for SHOW SLAVE STATUS */
#define MARIA10_STATUS_BINLOG_NAME 7
#define MARIA10_STATUS_BINLOG_POS 8
#define MARIA10_STATUS_IO_RUNNING 12
#define MARIA10_STATUS_SQL_RUNNING 13
#define MARIA10_STATUS_MASTER_ID 41
/** Column positions for SHOW SLAVE HOSTS */
#define SLAVE_HOSTS_SERVER_ID 0
#define SLAVE_HOSTS_HOSTNAME 1
#define SLAVE_HOSTS_PORT 2
static void monitorMain(void *);
static char *version_str = "V1.4.0";
static char *version_str = "V1.5.0";
/* @see function load_module in load_utils.c for explanation of the following
* lint directives.
@ -128,6 +145,89 @@ GetModuleObject()
{
return &MyObject;
}
/**
* Monitor specific information about a server
*/
typedef struct mysql_server_info
{
int server_id; /**< Value of @@server_id */
int master_id; /**< Master server id from SHOW SLAVE STATUS*/
int group; /**< Multi-master group where this server
belongs, 0 for servers not in groups */
bool read_only; /**< Value of @@read_only */
bool slave_configured; /**< Whether SHOW SLAVE STATUS returned rows */
bool slave_io; /**< If Slave IO thread is running */
bool slave_sql; /**< If Slave SQL thread is running */
uint64_t binlog_pos; /**< Binlog position from SHOW SLAVE STATUS */
char *binlog_name; /**< Binlog name from SHOW SLAVE STATUS */
} MYSQL_SERVER_INFO;
/** Other values are implicitly zero initialized */
#define MYSQL_SERVER_INFO_INIT {.binlog_name = ""}
void* info_copy_func(const void *val)
{
ss_dassert(val);
MYSQL_SERVER_INFO *old_val = (MYSQL_SERVER_INFO*)val;
MYSQL_SERVER_INFO *new_val = MXS_MALLOC(sizeof(MYSQL_SERVER_INFO));
char *binlog_name = MXS_STRDUP(old_val->binlog_name);
if (new_val && binlog_name)
{
*new_val = *old_val;
new_val->binlog_name = binlog_name;
}
else
{
MXS_FREE(new_val);
MXS_FREE(binlog_name);
new_val = NULL;
}
return new_val;
}
void info_free_func(void *val)
{
if (val)
{
MYSQL_SERVER_INFO *old_val = (MYSQL_SERVER_INFO*)val;
MXS_FREE(old_val->binlog_name);
MXS_FREE(old_val);
}
}
/**
* @brief Helper function that initializes the server info hashtable
*
* @param handle MySQL monitor handle
* @param database List of monitored databases
* @return True on success, false if initialization failed. At the moment
* initialization can only fail if memory allocation fails.
*/
bool init_server_info(MYSQL_MONITOR *handle, MONITOR_SERVERS *database)
{
MYSQL_SERVER_INFO info = MYSQL_SERVER_INFO_INIT;
bool rval = true;
while (database)
{
/** Delete any existing structures and replace them with empty ones */
hashtable_delete(handle->server_info, database->server);
if (!hashtable_add(handle->server_info, database->server->unique_name, &info))
{
rval = false;
break;
}
database = database->next;
}
return rval;
}
/*lint +e14 */
/**
@ -143,7 +243,7 @@ static void *
startMonitor(MONITOR *monitor, const CONFIG_PARAMETER* params)
{
MYSQL_MONITOR *handle = (MYSQL_MONITOR*) monitor->handle;
bool have_events = false, script_error = false;
bool have_events = false, script_error = false, error = false;
if (handle)
{
@ -151,10 +251,19 @@ startMonitor(MONITOR *monitor, const CONFIG_PARAMETER* params)
}
else
{
if ((handle = (MYSQL_MONITOR *) MXS_MALLOC(sizeof(MYSQL_MONITOR))) == NULL)
handle = (MYSQL_MONITOR *) MXS_MALLOC(sizeof(MYSQL_MONITOR));
HASHTABLE *server_info = hashtable_alloc(MAX_NUM_SLAVES, hashtable_item_strhash, hashtable_item_strcmp);
if (handle == NULL || server_info == NULL)
{
MXS_FREE(handle);
hashtable_free(server_info);
return NULL;
}
hashtable_memory_fns(server_info, hashtable_item_strdup, info_copy_func,
hashtable_item_free, info_free_func);
handle->server_info = server_info;
handle->shutdown = 0;
handle->id = config_get_gateway_id();
handle->replicationHeartbeat = 0;
@ -162,6 +271,7 @@ startMonitor(MONITOR *monitor, const CONFIG_PARAMETER* params)
handle->detectStaleSlave = true;
handle->master = NULL;
handle->script = NULL;
handle->multimaster = false;
handle->mysql51_replication = false;
memset(handle->events, false, sizeof(handle->events));
spinlock_init(&handle->lock);
@ -181,6 +291,10 @@ startMonitor(MONITOR *monitor, const CONFIG_PARAMETER* params)
{
handle->replicationHeartbeat = config_truth_value(params->value);
}
else if (!strcmp(params->name, "multimaster"))
{
handle->multimaster = config_truth_value(params->value);
}
else if (!strcmp(params->name, "script"))
{
if (externcmd_can_execute(params->value))
@ -214,25 +328,32 @@ startMonitor(MONITOR *monitor, const CONFIG_PARAMETER* params)
if (!check_monitor_permissions(monitor, "SHOW SLAVE STATUS"))
{
MXS_ERROR("Failed to start monitor. See earlier errors for more information.");
MXS_FREE(handle->script);
MXS_FREE(handle);
return NULL;
error = true;
}
if (script_error)
{
MXS_ERROR("Errors were found in the script configuration parameters "
"for the monitor '%s'. The script will not be used.", monitor->name);
MXS_FREE(handle->script);
handle->script = NULL;
MXS_ERROR("[%s] Errors were found in the script configuration parameters ", monitor->name);
error = true;
}
/** If no specific events are given, enable them all */
if (!have_events)
else if (!have_events)
{
/** If no specific events are given, enable them all */
memset(handle->events, true, sizeof(handle->events));
}
if (thread_start(&handle->thread, monitorMain, monitor) == NULL)
if (!init_server_info(handle, monitor->databases))
{
error = true;
}
if (error)
{
hashtable_free(handle->server_info);
MXS_FREE(handle->script);
MXS_FREE(handle);
}
else if (thread_start(&handle->thread, monitorMain, monitor) == NULL)
{
MXS_ERROR("Failed to start monitor thread for monitor '%s'.", monitor->name);
}
@ -286,229 +407,160 @@ static void diagnostics(DCB *dcb, const MONITOR *mon)
dcb_printf(dcb, "\tConnect Timeout:\t%i seconds\n", mon->connect_timeout);
dcb_printf(dcb, "\tRead Timeout:\t\t%i seconds\n", mon->read_timeout);
dcb_printf(dcb, "\tWrite Timeout:\t\t%i seconds\n", mon->write_timeout);
dcb_printf(dcb, "\tMonitored servers: ");
dcb_printf(dcb, "\nMonitored servers\n\n");
db = mon->databases;
sep = "";
while (db)
{
dcb_printf(dcb, "%s%s:%d", sep, db->server->name, db->server->port);
sep = ", ";
MYSQL_SERVER_INFO *serv_info = hashtable_fetch(handle->server_info, db->server->unique_name);
dcb_printf(dcb, "Server: %s\n", db->server->unique_name);
dcb_printf(dcb, "Server ID: %d\n", serv_info->server_id);
dcb_printf(dcb, "Read only: %s\n", serv_info->read_only ? "ON" : "OFF");
dcb_printf(dcb, "Slave configured: %s\n", serv_info->slave_configured ? "YES" : "NO");
dcb_printf(dcb, "Slave IO running: %s\n", serv_info->slave_io ? "YES" : "NO");
dcb_printf(dcb, "Slave SQL running: %s\n", serv_info->slave_sql ? "YES" : "NO");
dcb_printf(dcb, "Master ID: %d\n", serv_info->master_id);
dcb_printf(dcb, "Master binlog file: %s\n", serv_info->binlog_name);
dcb_printf(dcb, "Master binlog position: %lu\n", serv_info->binlog_pos);
if (handle->multimaster)
{
dcb_printf(dcb, "Master group: %d\n", serv_info->group);
}
dcb_printf(dcb, "\n");
db = db->next;
}
dcb_printf(dcb, "\n");
}
static inline void monitor_mysql100_db(MONITOR_SERVERS* database)
enum mysql_server_version
{
int isslave = 0;
MYSQL_RES* result;
MYSQL_ROW row;
MYSQL_SERVER_VERSION_100,
MYSQL_SERVER_VERSION_55,
MYSQL_SERVER_VERSION_51
};
if (mysql_query(database->con, "SHOW ALL SLAVES STATUS") == 0
static inline void monitor_mysql_db(MONITOR_SERVERS* database, MYSQL_SERVER_INFO *serv_info,
enum mysql_server_version server_version)
{
int columns, i_io_thread, i_sql_thread, i_binlog_pos, i_master_id, i_binlog_name;
const char *query;
if (server_version == MYSQL_SERVER_VERSION_100)
{
columns = 42;
query = "SHOW ALL SLAVES STATUS";
i_io_thread = MARIA10_STATUS_IO_RUNNING;
i_sql_thread = MARIA10_STATUS_SQL_RUNNING;
i_binlog_name = MARIA10_STATUS_BINLOG_NAME;
i_binlog_pos = MARIA10_STATUS_BINLOG_POS;
i_master_id = MARIA10_STATUS_MASTER_ID;
}
else
{
columns = server_version == MYSQL_SERVER_VERSION_55 ? 40 : 38;
query = "SHOW SLAVE STATUS";
i_io_thread = MYSQL55_STATUS_IO_RUNNING;
i_sql_thread = MYSQL55_STATUS_SQL_RUNNING;
i_binlog_name = MYSQL55_STATUS_BINLOG_NAME;
i_binlog_pos = MYSQL55_STATUS_BINLOG_POS;
i_master_id = MYSQL55_STATUS_MASTER_ID;
}
/** Clear old states */
monitor_clear_pending_status(database, SERVER_SLAVE | SERVER_MASTER | SERVER_RELAY_MASTER |
SERVER_STALE_STATUS | SERVER_SLAVE_OF_EXTERNAL_MASTER);
MYSQL_RES* result;
if (mysql_query(database->con, query) == 0
&& (result = mysql_store_result(database->con)) != NULL)
{
int i = 0;
long master_id = -1;
if (mysql_field_count(database->con) < 42)
if (mysql_field_count(database->con) < columns)
{
mysql_free_result(result);
MXS_ERROR("\"SHOW ALL SLAVES STATUS\" "
"returned less than the expected amount of columns. Expected 42 columns."
" MySQL Version: %s", version_str);
MXS_ERROR("\"%s\" returned less than the expected amount of columns. "
"Expected %d columns. MySQL Version: %s", query, columns, version_str);
return;
}
while ((row = mysql_fetch_row(result)))
{
/* get Slave_IO_Running and Slave_SQL_Running values*/
if (strncmp(row[12], "Yes", 3) == 0
&& strncmp(row[13], "Yes", 3) == 0)
{
isslave += 1;
}
MYSQL_ROW row = mysql_fetch_row(result);
long master_id = -1;
/* If Slave_IO_Running = Yes, assign the master_id to current server: this allows building
* the replication tree, slaves ids will be added to master(s) and we will have at least the
* root master server.
* Please note, there could be no slaves at all if Slave_SQL_Running == 'No'
*/
if (strncmp(row[12], "Yes", 3) == 0)
if (row)
{
serv_info->slave_configured = true;
int nconfigured = 0;
int nrunning = 0;
do
{
/* get Master_Server_Id values */
master_id = atol(row[41]);
if (master_id == 0)
/* get Slave_IO_Running and Slave_SQL_Running values*/
serv_info->slave_io = strncmp(row[i_io_thread], "Yes", 3) == 0;
serv_info->slave_sql = strncmp(row[i_sql_thread], "Yes", 3) == 0;
if (serv_info->slave_io && serv_info->slave_sql)
{
master_id = -1;
if (nrunning == 0)
{
/** Only check binlog name for the first running slave */
char *binlog_name = MXS_STRDUP(row[i_binlog_name]);
if (binlog_name)
{
MXS_FREE(serv_info->binlog_name);
serv_info->binlog_name = binlog_name;
serv_info->binlog_pos = atol(row[i_binlog_pos]);
}
}
nrunning++;
}
/* If Slave_IO_Running = Yes, assign the master_id to current server: this allows building
* the replication tree, slaves ids will be added to master(s) and we will have at least the
* root master server.
* Please note, there could be no slaves at all if Slave_SQL_Running == 'No'
*/
if (serv_info->slave_io && server_version != MYSQL_SERVER_VERSION_51)
{
/* Get Master_Server_Id */
master_id = atol(row[i_master_id]);
if (master_id == 0)
{
master_id = -1;
}
}
nconfigured++;
row = mysql_fetch_row(result);
}
while (row);
i++;
}
/* store master_id of current node */
memcpy(&database->server->master_id, &master_id, sizeof(long));
mysql_free_result(result);
/* If all configured slaves are running set this node as slave */
if (isslave > 0 && isslave == i)
{
isslave = 1;
/* If all configured slaves are running set this node as slave */
if (nrunning > 0 && nrunning == nconfigured)
{
monitor_set_pending_status(database, SERVER_SLAVE);
}
}
else
{
isslave = 0;
}
}
/* Remove addition info */
monitor_clear_pending_status(database, SERVER_SLAVE_OF_EXTERNAL_MASTER);
monitor_clear_pending_status(database, SERVER_STALE_STATUS);
/* Please note, the MASTER status and SERVER_SLAVE_OF_EXTERNAL_MASTER
* will be assigned in the monitorMain() via get_replication_tree() routine
*/
/* Set the Slave Role */
if (isslave)
{
monitor_set_pending_status(database, SERVER_SLAVE);
/* Avoid any possible stale Master state */
monitor_clear_pending_status(database, SERVER_MASTER);
}
else
{
/* Avoid any possible Master/Slave stale state */
monitor_clear_pending_status(database, SERVER_SLAVE);
monitor_clear_pending_status(database, SERVER_MASTER);
}
}
static inline void monitor_mysql55_db(MONITOR_SERVERS* database)
{
bool isslave = false;
MYSQL_RES* result;
MYSQL_ROW row;
if (mysql_query(database->con, "SHOW SLAVE STATUS") == 0
&& (result = mysql_store_result(database->con)) != NULL)
{
long master_id = -1;
if (mysql_field_count(database->con) < 40)
{
mysql_free_result(result);
MXS_ERROR("\"SHOW SLAVE STATUS\" "
"returned less than the expected amount of columns. Expected 40 columns."
" MySQL Version: %s", version_str);
return;
/** Query returned no rows, replication is not configured */
serv_info->slave_configured = false;
serv_info->slave_io = false;
serv_info->slave_sql = false;
serv_info->binlog_pos = 0;
serv_info->binlog_name[0] = '\0';
}
while ((row = mysql_fetch_row(result)))
{
/* get Slave_IO_Running and Slave_SQL_Running values*/
if (strncmp(row[10], "Yes", 3) == 0
&& strncmp(row[11], "Yes", 3) == 0)
{
isslave = 1;
}
/* If Slave_IO_Running = Yes, assign the master_id to current server: this allows building
* the replication tree, slaves ids will be added to master(s) and we will have at least the
* root master server.
* Please note, there could be no slaves at all if Slave_SQL_Running == 'No'
*/
if (strncmp(row[10], "Yes", 3) == 0)
{
/* get Master_Server_Id values */
master_id = atol(row[39]);
if (master_id == 0)
{
master_id = -1;
}
}
}
/* store master_id of current node */
memcpy(&database->server->master_id, &master_id, sizeof(long));
/** Store master_id of current node. For MySQL 5.1 it will be set at a later point. */
database->server->master_id = master_id;
serv_info->master_id = master_id;
mysql_free_result(result);
}
/* Remove addition info */
monitor_clear_pending_status(database, SERVER_SLAVE_OF_EXTERNAL_MASTER);
monitor_clear_pending_status(database, SERVER_STALE_STATUS);
/* Please note, the MASTER status and SERVER_SLAVE_OF_EXTERNAL_MASTER
* will be assigned in the monitorMain() via get_replication_tree() routine
*/
/* Set the Slave Role */
if (isslave)
{
monitor_set_pending_status(database, SERVER_SLAVE);
/* Avoid any possible stale Master state */
monitor_clear_pending_status(database, SERVER_MASTER);
}
else
{
/* Avoid any possible Master/Slave stale state */
monitor_clear_pending_status(database, SERVER_SLAVE);
monitor_clear_pending_status(database, SERVER_MASTER);
}
}
static inline void monitor_mysql51_db(MONITOR_SERVERS* database)
{
bool isslave = false;
MYSQL_RES* result;
MYSQL_ROW row;
if (mysql_query(database->con, "SHOW SLAVE STATUS") == 0
&& (result = mysql_store_result(database->con)) != NULL)
{
if (mysql_field_count(database->con) < 38)
{
mysql_free_result(result);
MXS_ERROR("\"SHOW SLAVE STATUS\" "
"returned less than the expected amount of columns. Expected 38 columns."
" MySQL Version: %s", version_str);
return;
}
while ((row = mysql_fetch_row(result)))
{
/* get Slave_IO_Running and Slave_SQL_Running values*/
if (strncmp(row[10], "Yes", 3) == 0
&& strncmp(row[11], "Yes", 3) == 0)
{
isslave = 1;
}
}
mysql_free_result(result);
}
/* Remove addition info */
monitor_clear_pending_status(database, SERVER_SLAVE_OF_EXTERNAL_MASTER);
monitor_clear_pending_status(database, SERVER_STALE_STATUS);
/* Please note, the MASTER status and SERVER_SLAVE_OF_EXTERNAL_MASTER
* will be assigned in the monitorMain() via get_replication_tree() routine
*/
/* Set the Slave Role */
if (isslave)
{
monitor_set_pending_status(database, SERVER_SLAVE);
/* Avoid any possible stale Master state */
monitor_clear_pending_status(database, SERVER_MASTER);
}
else
{
/* Avoid any possible Master/Slave stale state */
monitor_clear_pending_status(database, SERVER_SLAVE);
monitor_clear_pending_status(database, SERVER_MASTER);
}
}
/**
@ -550,9 +602,9 @@ static MONITOR_SERVERS *build_mysql51_replication_tree(MONITOR *mon)
while (nslaves < MAX_NUM_SLAVES && (row = mysql_fetch_row(result)))
{
/* get Slave_IO_Running and Slave_SQL_Running values*/
database->server->slaves[nslaves] = atol(row[0]);
database->server->slaves[nslaves] = atol(row[SLAVE_HOSTS_SERVER_ID]);
nslaves++;
MXS_DEBUG("Found slave at %s:%s", row[1], row[2]);
MXS_DEBUG("Found slave at %s:%s", row[SLAVE_HOSTS_HOSTNAME], row[SLAVE_HOSTS_PORT]);
}
database->server->slaves[nslaves] = 0;
}
@ -667,8 +719,10 @@ monitorDatabase(MONITOR *mon, MONITOR_SERVERS *database)
/* Also clear M/S state in both server and monitor server pending struct */
server_clear_status(database->server, SERVER_SLAVE);
server_clear_status(database->server, SERVER_MASTER);
server_clear_status(database->server, SERVER_RELAY_MASTER);
monitor_clear_pending_status(database, SERVER_SLAVE);
monitor_clear_pending_status(database, SERVER_MASTER);
monitor_clear_pending_status(database, SERVER_RELAY_MASTER);
/* Clean addition status too */
server_clear_status(database->server, SERVER_SLAVE_OF_EXTERNAL_MASTER);
@ -701,16 +755,19 @@ monitorDatabase(MONITOR *mon, MONITOR_SERVERS *database)
server_set_version_string(database->server, server_string);
}
/* get server_id form current node */
if (mysql_query(database->con, "SELECT @@server_id") == 0
MYSQL_SERVER_INFO *serv_info = hashtable_fetch(handle->server_info, database->server->unique_name);
ss_dassert(serv_info);
/* Get server_id and read_only from current node */
if (mysql_query(database->con, "SELECT @@server_id, @@read_only") == 0
&& (result = mysql_store_result(database->con)) != NULL)
{
long server_id = -1;
if (mysql_field_count(database->con) != 1)
if (mysql_field_count(database->con) != 2)
{
mysql_free_result(result);
MXS_ERROR("Unexpected result for 'SELECT @@server_id'. Expected 1 column."
MXS_ERROR("Unexpected result for 'SELECT @@server_id, @@read_only'. Expected 2 columns."
" MySQL Version: %s", version_str);
return;
}
@ -723,7 +780,10 @@ monitorDatabase(MONITOR *mon, MONITOR_SERVERS *database)
{
server_id = -1;
}
database->server->node_id = server_id;
serv_info->server_id = server_id;
serv_info->read_only = (row[1] && strcmp(row[1], "1") == 0);
}
mysql_free_result(result);
}
@ -731,17 +791,17 @@ monitorDatabase(MONITOR *mon, MONITOR_SERVERS *database)
/* Check first for MariaDB 10.x.x and get status for multi-master replication */
if (server_version >= 100000)
{
monitor_mysql100_db(database);
monitor_mysql_db(database, serv_info, MYSQL_SERVER_VERSION_100);
}
else if (server_version >= 5 * 10000 + 5 * 100)
{
monitor_mysql55_db(database);
monitor_mysql_db(database, serv_info, MYSQL_SERVER_VERSION_55);
}
else
{
if (handle->mysql51_replication)
{
monitor_mysql51_db(database);
monitor_mysql_db(database, serv_info, MYSQL_SERVER_VERSION_51);
}
else if (report_version_err)
{
@ -754,6 +814,213 @@ monitorDatabase(MONITOR *mon, MONITOR_SERVERS *database)
}
/**
* @brief A node in a graph
*/
struct graph_node
{
int index;
int lowest_index;
int cycle;
bool active;
struct graph_node *parent;
MYSQL_SERVER_INFO *info;
MONITOR_SERVERS *db;
};
/**
* @brief Visit a node in the graph
*
* This function is the main function used to determine whether the node is a
* part of a cycle. It is an implementation of the Tarjan's strongly connected
* component algorithm. All one node cycles are ignored since normal
* master-slave monitoring handles that.
*
* Tarjan's strongly connected component algorithm:
*
* https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
*/
static void visit_node(struct graph_node *node, struct graph_node **stack,
int *stacksize, int *index, int *cycle)
{
/** Assign an index to this node */
node->lowest_index = node->index = *index;
node->active = true;
*index += 1;
stack[*stacksize] = node;
*stacksize += 1;
if (node->parent == NULL)
{
/** This node does not connect to another node, it can't be a part of a cycle */
node->lowest_index = -1;
}
else if (node->parent->index == 0)
{
/** Node has not been visited */
visit_node(node->parent, stack, stacksize, index, cycle);
if (node->parent->lowest_index < node->lowest_index)
{
/** The parent connects to a node with a lower index, this node
could be a part of a cycle. */
node->lowest_index = node->parent->lowest_index;
}
}
else if (node->parent->active)
{
/** This node could be a root node of the cycle */
if (node->parent->index < node->lowest_index)
{
/** Root node found */
node->lowest_index = node->parent->index;
}
}
else
{
/** Node connects to an already connected cycle, it can't be a part of it */
node->lowest_index = -1;
}
if (node->active && node->parent && node->lowest_index > 0)
{
if (node->lowest_index == node->index &&
node->lowest_index == node->parent->lowest_index)
{
/**
* Found a multi-node cycle from the graph. The cycle is formed from the
* nodes with a lowest_index value equal to the lowest_index value of the
* current node. Rest of the nodes on the stack are not part of a cycle
* and can be discarded.
*/
*cycle += 1;
while (*stacksize > 0)
{
struct graph_node *top = stack[(*stacksize) - 1];
top->active = false;
if (top->lowest_index == node->lowest_index)
{
top->cycle = *cycle;
}
*stacksize -= 1;
}
}
}
else
{
/** Pop invalid nodes off the stack */
node->active = false;
*stacksize -= 1;
}
}
/**
* @brief Find the strongly connected components in the replication tree graph
*
* Each replication cluster is a directed graph made out of replication
* trees. If this graph has strongly connected components (more generally
* cycles), it is considered a multi-master cluster due to the fact that there
* are multiple nodes where the data can originate.
*
* Detecting the cycles in the graph allows this monitor to better understand
* the relationships between the nodes. All nodes that are a part of a cycle can
* be labeled as master nodes. This information will later be used to choose the
* right master where the writes should go.
*
* This function also populates the MYSQL_SERVER_INFO structures group
* member. Nodes in a group get a positive group ID where the nodes not in a
* group get a group ID of 0.
*/
void find_graph_cycles(MYSQL_MONITOR *handle, MONITOR_SERVERS *database, int nservers)
{
struct graph_node graph[nservers];
struct graph_node *stack[nservers];
int nodes = 0;
for (MONITOR_SERVERS *db = database; db; db = db->next)
{
graph[nodes].info = hashtable_fetch(handle->server_info, db->server->unique_name);
graph[nodes].db = db;
ss_dassert(graph[nodes].info);
graph[nodes].index = graph[nodes].lowest_index = 0;
graph[nodes].cycle = 0;
graph[nodes].active = false;
graph[nodes].parent = NULL;
nodes++;
}
/** Build the graph */
for (int i = 0; i < nservers; i++)
{
if (graph[i].info->master_id > 0)
{
/** Found a connected node */
for (int k = 0; k < nservers; k++)
{
if (graph[k].info->server_id == graph[i].info->master_id)
{
graph[i].parent = &graph[k];
break;
}
}
}
}
int index = 1;
int cycle = 0;
int stacksize = 0;
for (int i = 0; i < nservers; i++)
{
if (graph[i].index == 0)
{
/** Index is 0, this node has not yet been visited */
visit_node(&graph[i], stack, &stacksize, &index, &cycle);
}
}
for (int i = 0; i < nservers; i++)
{
graph[i].info->group = graph[i].cycle;
if (graph[i].cycle > 0)
{
/** We have at least one cycle in the graph */
if (graph[i].info->read_only)
{
monitor_set_pending_status(graph[i].db, SERVER_SLAVE);
monitor_clear_pending_status(graph[i].db, SERVER_MASTER);
}
else
{
monitor_set_pending_status(graph[i].db, SERVER_MASTER);
monitor_clear_pending_status(graph[i].db, SERVER_SLAVE);
}
}
else if (handle->detectStaleMaster && cycle == 0 &&
graph[i].db->server->status & SERVER_MASTER &&
(graph[i].db->pending_status & SERVER_MASTER) == 0)
{
/**
* Stale master detection is handled here for multi-master mode.
*
* If we know that no cycles were found from the graph and that a
* server once had the master status, replication has broken
* down. These masters are assigned the stale master status allowing
* them to be used as masters even if they lose their slaves. A
* slave in this case can be either a normal slave or another
* master.
*/
monitor_set_pending_status(graph[i].db, SERVER_MASTER | SERVER_STALE_STATUS);
monitor_clear_pending_status(graph[i].db, SERVER_SLAVE);
}
}
}
/**
* The entry point for the monitoring module thread
*
@ -927,6 +1194,32 @@ monitorMain(void *arg)
}
if (handle->multimaster)
{
/** Find all the master server cycles in the cluster graph. If
multiple masters are found, the servers with the read_only
variable set to ON will be assigned the slave status. */
find_graph_cycles(handle, mon->databases, num_servers);
}
ptr = mon->databases;
while (ptr)
{
MYSQL_SERVER_INFO *serv_info = hashtable_fetch(handle->server_info, ptr->server->unique_name);
ss_dassert(serv_info);
if (ptr->server->node_id > 0 && ptr->server->master_id > 0 &&
getSlaveOfNodeId(mon->databases, ptr->server->node_id) &&
getServerByNodeId(mon->databases, ptr->server->master_id) &&
(!handle->multimaster || serv_info->group == 0))
{
/** This server is both a slave and a master i.e. a relay master */
monitor_set_pending_status(ptr, SERVER_RELAY_MASTER);
monitor_clear_pending_status(ptr, SERVER_MASTER);
}
ptr = ptr->next;
}
/* Update server status from monitor pending status on that server*/
ptr = mon->databases;
@ -934,8 +1227,11 @@ monitorMain(void *arg)
{
if (!SERVER_IN_MAINT(ptr->server))
{
/* If "detect_stale_master" option is On, let's use the previous master */
if (detect_stale_master && root_master &&
/** If "detect_stale_master" option is On, let's use the previous master.
*
* Multi-master mode detects the stale masters in find_graph_cycles().
*/
if (detect_stale_master && root_master && !handle->multimaster &&
(strcmp(ptr->server->name, root_master->server->name) == 0 &&
ptr->server->port == root_master->server->port) &&
(ptr->server->status & SERVER_MASTER) &&
@ -1445,6 +1741,14 @@ static MONITOR_SERVERS *get_replication_tree(MONITOR *mon, int num_servers)
add_slave_to_master(master->server->slaves, sizeof(master->server->slaves),
current->node_id);
master->server->depth = current->depth - 1;
if(handle->master && master->server->depth < handle->master->server->depth)
{
/** A master with a lower depth was found, remove
the master status from the previous master. */
monitor_clear_pending_status(handle->master, SERVER_MASTER);
}
monitor_set_pending_status(master, SERVER_MASTER);
handle->master = master;
}