Merge branch '2.2' into develop

This commit is contained in:
Markus Mäkelä
2017-11-10 09:40:24 +02:00
22 changed files with 171 additions and 720 deletions

View File

@ -23,6 +23,7 @@
* Binlog router supports MariaDB 10 GTID at both ends.
* KILL CONNECTION can now be used through MaxScale.
* Environment variables can now be used in the MaxScale configuration file.
* By default, MaxScale can no longer be run as root.
For more details, please refer to:
* [MariaDB MaxScale 2.2.0 Release Notes](Release-Notes/MaxScale-2.2.0-Release-Notes.md)

View File

@ -233,14 +233,6 @@ REST API or MaxAdmin.
**Note:** The monitor user must have the SUPER privilege if the failover feature
is enabled.
### `failover_script`
*NOTE* By default, MariaDB MaxScale uses the MariaDB provided failover
script, so `failover_script` need not be specified.
This command will be executed in order to perform a failover. `failover_script`
should be specified the same way as [script](./Monitor-Common.md#script) is.
### `failover_timeout`
The timeout for the cluster failover in seconds. The default value is 90

View File

@ -9,12 +9,33 @@ For any problems you encounter, please consider submitting a bug
report at [Jira](https://jira.mariadb.org).
## Changed Features
### Process identity
By default, MaxScale can no longer be run as `root`, but must be run as some
other user. However, it is possible to start MaxScale as `root`, as long as
the user to run MaxScale as is provided as a command line argument:
```
root@host:~# maxscale --user=maxuser ...
```
If it is imperative to run MaxScale as root, e.g. in a Docker container, it
can be achieved by invoking MaxScale as root and by explicitly specifying
the user to also be root:
```
root@host:~# maxscale --user=root ...
```
### Binlog server
- MariaDB 10 GTID is always enabled for slave connections.
- Automatically set binlog storage to 'tree' mode when
_mariadb10_master_gtid_ option is on.
* The `mariadb10_slave_gtid` parameter was removed and slave connections can now
always register with MariaDB 10 GTID.
* The `binlog_structure` parameter was removed and the binlogs are stored
automatically in 'tree' mode when `mariadb10_master_gtid` is enabled.
* If `mariadb10_master_gtid` is enabled, the `transaction_safety` is
automatically enabled. In MaxScale 2.2.0, if `transaction_safety` was disabled
when `mariadb10_master_gtid` was enabled MaxScale would refuse to start.
## Dropped Features

View File

@ -296,13 +296,13 @@ Example:
```
### `mariadb10_master_gtid`
This option allows MaxScale binlog router to register
with MariaDB 10.X master using GTID instead of _binlog_file_ name
and _position_ in CHANGE MASTER TO admin command.
The user can set a known GTID or an empty value
(in this case the Master server will send events
from it's first available binlog file).
This option allows MaxScale binlog router to register with MariaDB 10.X master
using GTID instead of _binlog_file_ name and _position_ in CHANGE MASTER TO
admin command. This feature is disabled by default.
The user can set a known GTID or an empty value (in this case the Master server
will send events from it's first available binlog file).
Example of MaxScale connection to a MariaDB 10.X Master
@ -316,13 +316,12 @@ MariaDB> CHANGE MASTER TO
MariaDB> START SLAVE;
```
If using GTID request then it's no longer possible to use
MASTER_LOG_FILE and MASTER_LOG_POS in `CHANGE MASTER TO`
command: an error will be reported.
If using GTID request then it's no longer possible to use MASTER_LOG_FILE and
MASTER_LOG_POS in `CHANGE MASTER TO` command: an error will be reported.
The default option value is _Off_, setting it to _On_
automatically sets _mariadb10_slave_gtid_ to _On_
(which enables GTID storage and GTID slave connections)
If this feature is enabled, the _transaction_safety_ option will be
automatically enabled. The binlog files will also be stored in a
hierarchical directory tree instead of a single directory.
**Note:**

View File

@ -32,6 +32,10 @@ ExampleFilterSession* ExampleFilterSession::create(MXS_SESSION* pSession, const
return new ExampleFilterSession(pSession);
}
void ExampleFilterSession::close()
{
}
int ExampleFilterSession::routeQuery(GWBUF* pPacket)
{
return mxs::FilterSession::routeQuery(pPacket);

View File

@ -1,26 +0,0 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
/**
* @file queuemanager.h The Queue Manager public header
*/
#include <maxscale/cdefs.h>
MXS_BEGIN_DECLS
struct queue_config;
typedef struct queue_config QUEUE_CONFIG;
MXS_END_DECLS

View File

@ -35,7 +35,6 @@
#include <maxscale/hashtable.h>
#include <maxscale/resultset.h>
#include <maxscale/config.h>
#include <maxscale/queuemanager.h>
#include <maxscale/jansson.h>
MXS_BEGIN_DECLS
@ -128,7 +127,6 @@ typedef struct service
int state; /**< The service state */
int client_count; /**< Number of connected clients */
int max_connections; /**< Maximum client connections */
QUEUE_CONFIG *queued_connections; /**< Queued connections, if set */
SERV_LISTENER *ports; /**< Linked list of ports and protocols
* that this service will listen on */
char *routerModule; /**< Name of router module to use */

View File

@ -10,7 +10,7 @@ fi
T="$(date +%s)"
$maxscale_access_sudo maxscale -d
$maxscale_access_sudo maxscale -d -U maxscale
if [ $? -ne 0 ] ; then
exit 1
fi

View File

@ -27,7 +27,8 @@ int main(int argc, char *argv[])
test.tprintf("Starting test");
test.verbose = true;
int rv = test.ssh_maxscale(true, "export maxscale2_API=%s:8989; ./test_maxctrl.sh", test.galera->IP[3]);
int rv = test.ssh_maxscale(true, "export maxscale_access_homedir=%s; export maxscale2_API=%s:8989; ./test_maxctrl.sh",
test.maxscale_access_homedir, test.galera->IP[3]);
test.verbose = false;
test.tprintf("Removing NPM");

View File

@ -20,7 +20,7 @@ fi
cd MaxScale/maxctrl
# Create the scripts that start and stop MaxScale
~/maxctrl_scripts.sh
$maxscale_access_homedir/maxctrl_scripts.sh
chmod +x *.sh
npm i

View File

@ -33,7 +33,6 @@ add_library(maxscale-common SHARED
paths.cc
poll.cc
query_classifier.cc
queuemanager.cc
random_jkiss.cc
resultset.cc
resource.cc

View File

@ -597,7 +597,9 @@ bool runtime_alter_service(SERVICE *service, const char* zKey, const char* zValu
{
valid = true;
// TODO: Once connection queues are implemented, use correct values
serviceSetConnectionLimits(service, i, 0, 0);
const int queued_connections = 0; // At most this many pending connections.
const int timeout = 0; // Wait at most this much for a connection.
serviceSetConnectionLimits(service, i, queued_connections, timeout);
}
}
else if (key == CN_CONNECTION_TIMEOUT)

View File

@ -56,7 +56,6 @@
#include <maxscale/utils.h>
#include "maxscale/modules.h"
#include "maxscale/queuemanager.h"
#include "maxscale/semaphore.hh"
#include "maxscale/session.h"
#include "maxscale/worker.hh"
@ -2435,14 +2434,13 @@ dcb_accept(DCB *listener)
if (client_dcb->service->max_connections &&
client_dcb->service->client_count >= client_dcb->service->max_connections)
{
if (!mxs_enqueue(client_dcb->service->queued_connections, client_dcb))
// TODO: If connections can be queued, this is the place to put the
// TODO: connection on that queue.
if (client_dcb->func.connlimit)
{
if (client_dcb->func.connlimit)
{
client_dcb->func.connlimit(client_dcb, client_dcb->service->max_connections);
}
dcb_close(client_dcb);
client_dcb->func.connlimit(client_dcb, client_dcb->service->max_connections);
}
dcb_close(client_dcb);
client_dcb = NULL;
}
}

View File

@ -107,6 +107,7 @@ const char *progname = NULL;
static struct option long_options[] =
{
{"config-check", no_argument, 0, 'c'},
{"daemon", no_argument, 0, 'n'},
{"nodaemon", no_argument, 0, 'd'},
{"config", required_argument, 0, 'f'},
{"log", required_argument, 0, 'l'},
@ -186,6 +187,7 @@ static void modules_process_finish();
static void disable_module_unloading(const char* arg);
static void enable_module_unloading(const char* arg);
static void redirect_output_to_file(const char* arg);
static bool user_is_acceptable(const char* specified_user);
struct DEBUG_ARGUMENT
{
@ -1367,6 +1369,7 @@ int main(int argc, char **argv)
int numlocks = 0;
bool pid_file_created = false;
Worker* worker;
const char* specified_user = NULL;
config_set_global_defaults();
MXS_CONFIG* cnf = config_get_global_options();
@ -1382,7 +1385,7 @@ int main(int argc, char **argv)
file_write_header(stderr);
// Option string for getopt
const char accepted_opts[] = "dcf:g:l:vVs:S:?L:D:C:B:U:A:P:G:N:E:F:M:H:p";
const char accepted_opts[] = "dncf:g:l:vVs:S:?L:D:C:B:U:A:P:G:N:E:F:M:H:p";
/*<
* Register functions which are called at exit.
@ -1411,8 +1414,13 @@ int main(int argc, char **argv)
switch (opt)
{
case 'n':
/*< Daemon mode, MaxScale forks and parent exits. */
daemon_mode = true;
break;
case 'd':
/*< Debug mode, maxscale runs in this same process */
/*< Non-daemon mode, MaxScale does not fork. */
daemon_mode = false;
break;
@ -1653,7 +1661,8 @@ int main(int argc, char **argv)
}
break;
case 'U':
if (set_user(optarg) != 0)
specified_user = optarg;
if (set_user(specified_user) != 0)
{
succp = false;
}
@ -1694,6 +1703,13 @@ int main(int argc, char **argv)
}
}
if (!user_is_acceptable(specified_user))
{
// Error was logged in user_is_acceptable().
rc = MAXSCALE_INTERNALERROR;
goto return_main;
}
if (config_check)
{
daemon_mode = false;
@ -3204,3 +3220,41 @@ static bool handle_debug_args(char* args)
}
return !arg_error;
}
static bool user_is_acceptable(const char* specified_user)
{
bool acceptable = false;
// This is very early, so we do not have logging available, but write to stderr.
// As this is security related, we want to do as little as possible.
uid_t uid = getuid(); // Always succeeds
errno = 0;
struct passwd *pw = getpwuid(uid);
if (pw)
{
if (strcmp(pw->pw_name, "root") == 0)
{
if (specified_user && (strcmp(specified_user, "root") == 0))
{
// MaxScale was invoked as root and with --user=root.
acceptable = true;
}
else
{
fprintf(stderr, "Error: MaxScale cannot be run as root.\n");
}
}
else
{
acceptable = true;
}
}
else
{
fprintf(stderr, "Error: Could not obtain user information, MaxScale will not run: %s",
strerror(errno));
}
return acceptable;
}

View File

@ -1,54 +0,0 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
/**
* @file core/maxscale/queuemanager.h - The private queuemanager interface
*/
#include <maxscale/queuemanager.h>
#include <maxscale/spinlock.h>
MXS_BEGIN_DECLS
typedef struct queue_entry
{
void *queued_object;
long heartbeat;
#if defined(SS_DEBUG)
long sequence_check;
#endif /* SS_DEBUG */
} QUEUE_ENTRY;
struct queue_config
{
int queue_limit;
int start;
int end;
int timeout;
bool has_entries;
SPINLOCK queue_lock;
QUEUE_ENTRY *queue_array;
#if defined(SS_DEBUG)
long sequence_number;
#endif /* SS_DEBUG */
};
QUEUE_CONFIG *mxs_queue_alloc(int limit, int timeout);
void mxs_queue_free(QUEUE_CONFIG *queue_config);
bool mxs_enqueue(QUEUE_CONFIG *queue_config, void *new_entry);
bool mxs_dequeue(QUEUE_CONFIG *queue_config, QUEUE_ENTRY *result);
bool mxs_dequeue_if_expired(QUEUE_CONFIG *queue_config, QUEUE_ENTRY *result);
MXS_END_DECLS

View File

@ -1,229 +0,0 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
/**
* @file queuemanager.c - Logic for FIFO queue handling
*
* MaxScale contains a number of FIFO queues. This code attempts to provide
* standard functions for handling them.
*
* @verbatim
* Revision History
*
* Date Who Description
* 27/04/16 Martin Brampton Initial implementation
*
* @endverbatim
*/
#include <maxscale/queuemanager.h>
#include <stdlib.h>
#include <stdio.h>
#include <maxscale/alloc.h>
#include <maxscale/debug.h>
#include <maxscale/hk_heartbeat.h>
#include <maxscale/log_manager.h>
#include <maxscale/spinlock.h>
#include "maxscale/queuemanager.h"
#if defined(SS_DEBUG)
int debug_check_fail = 0;
#endif /* SS_DEBUG */
static inline int mxs_queue_count(QUEUE_CONFIG*);
/**
* @brief Allocate a new queue
*
* Provides for FIFO queues, this is the first operation to be requested
* for the use of a queue.
*
* @param limit The maximum size of the queue
* @param timeout The maximum time for which an entry is valid
* @return QUEUE_CONFIG A queue configuration and anchor structure
*/
QUEUE_CONFIG
*mxs_queue_alloc(int limit, int timeout)
{
QUEUE_CONFIG *new_queue = (QUEUE_CONFIG *)MXS_CALLOC(1, sizeof(QUEUE_CONFIG));
if (new_queue)
{
new_queue->queue_array = (QUEUE_ENTRY*)MXS_CALLOC(limit + 1, sizeof(QUEUE_ENTRY));
if (new_queue->queue_array)
{
new_queue->queue_limit = limit;
new_queue->timeout = timeout;
spinlock_init(&new_queue->queue_lock);
#if defined(SS_DEBUG)
new_queue->sequence_number = 0;
#endif /* SS_DEBUG */
return new_queue;
}
MXS_FREE(new_queue);
}
return NULL;
}
/**
* @brief Free a queue configuration
*
* Provides for FIFO queues, this is the last operation to be requested, when
* there is no further use for the queue.
*
* @param QUEUE_CONFIG A queue configuration and anchor structure
*/
void mxs_queue_free(QUEUE_CONFIG *queue_config)
{
if (queue_config)
{
MXS_FREE(queue_config->queue_array);
MXS_FREE(queue_config);
}
}
/**
* @brief Add an item to a queue
*
* Add a new item to a FIFO queue. If the queue config is null, this function
* will behave as if the queue is full.
*
* @param queue_config The configuration and anchor structure for the queue
* @param new_entry The new entry, to be added
* @return bool Whether the enqueue succeeded
*/
bool mxs_enqueue(QUEUE_CONFIG *queue_config, void *new_entry)
{
bool result = false;
if (queue_config)
{
spinlock_acquire(&queue_config->queue_lock);
if (mxs_queue_count(queue_config) < queue_config->queue_limit)
{
queue_config->queue_array[queue_config->end].queued_object = new_entry;
queue_config->queue_array[queue_config->end].heartbeat = hkheartbeat;
#if defined(SS_DEBUG)
queue_config->queue_array[queue_config->end].sequence_check = queue_config->sequence_number;
queue_config->sequence_number++;
#endif /* SS_DEBUG */
queue_config->end++;
if (queue_config->end > queue_config->queue_limit)
{
queue_config->end = 0;
}
queue_config->has_entries = true;
result = true;
}
else
{
result = false;
}
spinlock_release(&queue_config->queue_lock);
}
return result;
}
/**
* @brief Remove an item from a queue
*
* Remove an item from a FIFO queue. If the queue config is NULL, the function
* will behave as if for an empty queue.
*
* @param queue_config The configuration and anchor structure for the queue
* @param result A queue entry structure that will receive the result
* @return bool indicating whether an item was successfully dequeued
*/
bool mxs_dequeue(QUEUE_CONFIG *queue_config, QUEUE_ENTRY *result)
{
QUEUE_ENTRY *found = NULL;
if (queue_config && queue_config->has_entries)
{
spinlock_acquire(&queue_config->queue_lock);
if (mxs_queue_count(queue_config) > 0)
{
found = &(queue_config->queue_array[queue_config->start]);
#if defined(SS_DEBUG)
ss_dassert((queue_config->sequence_number) == (found->sequence_check + mxs_queue_count(queue_config)));
if ((queue_config->sequence_number) != (found->sequence_check + mxs_queue_count(queue_config)))
{
debug_check_fail++;
}
#endif /* SS_DEBUG */
result->heartbeat = found->heartbeat;
result->queued_object = found->queued_object;
if (++queue_config->start > queue_config->queue_limit)
{
queue_config->start = 0;
}
queue_config->has_entries = (mxs_queue_count(queue_config) > 0);
}
spinlock_release(&queue_config->queue_lock);
}
return (found != NULL);
}
/**
* @brief Remove an item from a queue if it has passed the timeout limit
*
* Remove an item from a FIFO queue if expired. If the queue config is NULL,
* the function will behave as for an empty queue.
*
* @param queue_config The configuration and anchor structure for the queue
* @param result A queue entry structure that will receive the result
* @return bool indicating whether an item was successfully dequeued
*/
bool mxs_dequeue_if_expired(QUEUE_CONFIG *queue_config, QUEUE_ENTRY *result)
{
QUEUE_ENTRY *found = NULL;
if (queue_config && queue_config->has_entries)
{
spinlock_acquire(&queue_config->queue_lock);
if (mxs_queue_count(queue_config) > 0)
{
found = &(queue_config->queue_array[queue_config->start]);
if (found->heartbeat > hkheartbeat - queue_config->timeout)
{
found = NULL;
}
else
{
#if defined(SS_DEBUG)
ss_dassert((queue_config->sequence_number) == (found->sequence_check + mxs_queue_count(queue_config)));
if ((queue_config->sequence_number) != (found->sequence_check + mxs_queue_count(queue_config)))
{
debug_check_fail++;
}
#endif /* SS_DEBUG */
result->heartbeat = found->heartbeat;
result->queued_object = found->queued_object;
if (++queue_config->start > queue_config->queue_limit)
{
queue_config->start = 0;
}
queue_config->has_entries = (mxs_queue_count(queue_config) > 0);
}
}
spinlock_release(&queue_config->queue_lock);
}
return (found != NULL);
}
static inline int mxs_queue_count(QUEUE_CONFIG *queue_config)
{
int count = queue_config->end - queue_config->start;
return count < 0 ? (count + queue_config->queue_limit + 1) : count;
}

View File

@ -39,7 +39,6 @@
#include <maxscale/log_manager.h>
#include <maxscale/poll.h>
#include <maxscale/protocol.h>
#include <maxscale/queuemanager.h>
#include <maxscale/resultset.h>
#include <maxscale/router.h>
#include <maxscale/server.h>
@ -54,7 +53,6 @@
#include "maxscale/config.h"
#include "maxscale/filter.h"
#include "maxscale/modules.h"
#include "maxscale/queuemanager.h"
#include "maxscale/service.h"
/** This define is needed in CentOS 6 systems */
@ -97,7 +95,6 @@ static int find_type(typelib_t* tl, const char* needle, int maxlen);
static void service_add_qualified_param(SERVICE* svc,
MXS_CONFIG_PARAMETER* param);
static void service_internal_restart(void *data);
static void service_queue_check(void *data);
static void service_calculate_weights(SERVICE *service);
SERVICE* service_alloc(const char *name, const char *router)
@ -145,7 +142,6 @@ SERVICE* service_alloc(const char *name, const char *router)
service->name = my_name;
service->routerModule = my_router;
service->users_from_all = false;
service->queued_connections = NULL;
service->localhost_match_wildcard_host = SERVICE_PARAM_UNINIT;
service->retry_start = true;
service->conn_idle_timeout = SERVICE_NO_SESSION_TIMEOUT;
@ -1157,6 +1153,8 @@ void serviceSetVersionString(SERVICE *service, const char* value)
* @param max The maximum number of client connections at any one time
* @param queued The maximum number of connections to queue up when
* max_connections clients are already connected
* @param timeout Maximum amount of time to wait for a connection to
* become available.
* @return 1 on success, 0 when the values are invalid
*/
int
@ -1169,45 +1167,13 @@ serviceSetConnectionLimits(SERVICE *service, int max, int queued, int timeout)
}
service->max_connections = max;
if (queued && timeout)
{
char callback_name[100];
sprintf(callback_name, "Check queued connections %p", service);
/* If memory allocation fails, result will be null so no queue */
service->queued_connections = mxs_queue_alloc(queued, timeout);
if (service->queued_connections)
{
hktask_add(callback_name, service_queue_check, (void *)service->queued_connections, 1);
}
}
ss_info_dassert(queued == 0, "Queued connections not implemented.");
ss_info_dassert(timeout == 0, "Queued connections not implemented.");
return 1;
}
/*
* @brief The callback function triggered by housekeeping every second
*
* This function removes any expired connection requests from the queue, and
* sends an error message "too many connections" for them.
*
* @param The parameter provided by the callback is the queue config
*/
static void
service_queue_check(void *data)
{
QUEUE_ENTRY expired;
QUEUE_CONFIG *queue_config = (QUEUE_CONFIG *)data;
/* The queued connections are in a FIFO queue, so we only look at the */
/* start of the queue, and remove any expired entries. As soon as this */
/* returns nothing, we stop. */
while ((mxs_dequeue_if_expired(queue_config, &expired)))
{
DCB *dcb = (DCB *)expired.queued_object;
dcb->func.connlimit(dcb, queue_config->queue_limit);
dcb_close(dcb);
}
}
/**
* Enable or disable the restarting of the service on failure.
* @param service Service to configure

View File

@ -10,7 +10,6 @@ add_executable(test_logorder testlogorder.cc)
add_executable(test_logthrottling testlogthrottling.cc)
add_executable(test_modutil testmodutil.cc)
add_executable(test_poll testpoll.cc)
add_executable(test_queuemanager testqueuemanager.cc)
add_executable(test_semaphore testsemaphore.cc)
add_executable(test_server testserver.cc)
add_executable(test_service testservice.cc)
@ -38,7 +37,6 @@ target_link_libraries(test_logorder maxscale-common)
target_link_libraries(test_logthrottling maxscale-common)
target_link_libraries(test_modutil maxscale-common)
target_link_libraries(test_poll maxscale-common)
target_link_libraries(test_queuemanager maxscale-common)
target_link_libraries(test_semaphore maxscale-common)
target_link_libraries(test_server maxscale-common)
target_link_libraries(test_service maxscale-common)
@ -68,7 +66,6 @@ add_test(TestMaxScalePCRE2 testmaxscalepcre2)
add_test(TestModutil test_modutil)
add_test(NAME TestMaxPasswd COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/testmaxpasswd.sh)
add_test(TestPoll test_poll)
add_test(TestQueueManager test_queuemanager)
add_test(TestSemaphore test_semaphore)
add_test(TestServer test_server)
add_test(TestService test_service)

View File

@ -1,259 +0,0 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
/**
*
* @verbatim
* Revision History
*
* Date Who Description
* 21/06/2016 Martin Brampton Initial implementation
*
* @endverbatim
*/
// To ensure that ss_info_assert asserts also when builing in non-debug mode.
#if !defined(SS_DEBUG)
#define SS_DEBUG
int debug_check_fail = 1;
#else
// This is defined in the queuemanager code but only in debug builds
extern int debug_check_fail;
#endif
#if defined(NDEBUG)
#undef NDEBUG
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <maxscale/random_jkiss.h>
#include <maxscale/hk_heartbeat.h>
#include <maxscale/alloc.h>
#include "../maxscale/queuemanager.h"
#include "test_utils.h"
/**
* test1 Allocate a queue and do lots of other things
*
*/
#define TEST_QUEUE_SIZE 5
#define HEARTBEATS_TO_EXPIRE 3
#define NUMBER_OF_THREADS 4
#define THREAD_TEST_COUNT 1000000
static QUEUE_CONFIG *thread_queue;
static int
test1()
{
QUEUE_CONFIG *queue;
int filled = 0;
int emptied = 0;
int expired = 0;
int input_counter = 0;
int output_counter = 0;
random_jkiss_init();
hkheartbeat = 0;
queue = mxs_queue_alloc(TEST_QUEUE_SIZE, HEARTBEATS_TO_EXPIRE);
{
QUEUE_ENTRY entry;
if (mxs_dequeue(queue, &entry))
{
ss_dfprintf(stderr, "\nError mxs_dequeue on empty queue did not return false.\n");
return 1;
}
if (mxs_dequeue_if_expired(queue, &entry))
{
ss_dfprintf(stderr, "\nError mxs_dequeue_if_expired on empty queue did not return false.\n");
return 1;
}
}
while (filled < 250 || emptied < 250 || expired < 250)
{
ss_dfprintf(stderr, "Input counter %d and output counter %d\n", input_counter, output_counter);
ss_dfprintf(stderr, "Difference between counters %d\n", input_counter - output_counter);
ss_dfprintf(stderr, "Filled %d, emptied %d, expired %d\n", filled, emptied, expired);
if (random_jkiss() % 2)
{
int *entrynumber = (int*)MXS_MALLOC(sizeof(int));
*entrynumber = input_counter;
if (mxs_enqueue(queue, entrynumber))
{
input_counter++;
if ((input_counter - output_counter) > TEST_QUEUE_SIZE)
{
ss_dfprintf(stderr, "\nQueue full, but mxs_enqueue accepted entry.\n");
return 3;
}
}
else
{
QUEUE_ENTRY entry;
if ((input_counter - output_counter) != TEST_QUEUE_SIZE)
{
ss_dfprintf(stderr, "\nFailed enqueue, but input counter %d and output counter %d do not differ by %d.\n",
input_counter,
output_counter,
TEST_QUEUE_SIZE);
return 4;
}
filled++;
if (0 == (random_jkiss() % 5))
{
if ((mxs_dequeue_if_expired(queue, &entry)))
{
if ((entry.heartbeat) > (hkheartbeat - HEARTBEATS_TO_EXPIRE))
{
ss_dfprintf(stderr, "\nReturned an expired entry even though none or not expired.\n");
return 5;
}
if (*(int *)entry.queued_object != output_counter)
{
ss_dfprintf(stderr, "\nOutput counter was %d, but dequeue gave %d.\n",
output_counter,
*(int *)entry.queued_object);
return 10;
}
output_counter++;
MXS_FREE(entry.queued_object);
}
else
{
hkheartbeat += (HEARTBEATS_TO_EXPIRE + 1);
if (mxs_dequeue_if_expired(queue, &entry))
{
if (*(int *)entry.queued_object != output_counter)
{
ss_dfprintf(stderr, "\nOutput counter was %d, but dequeue gave %d.\n",
output_counter,
*(int *)entry.queued_object);
return 6;
}
output_counter++;
MXS_FREE(entry.queued_object);
}
else
{
ss_dfprintf(stderr, "\nReturned no expired entry even though all are expired.\n");
return 7;
}
expired++;
}
}
}
}
else
{
QUEUE_ENTRY entry;
if (mxs_dequeue(queue, &entry))
{
if (*(int *)entry.queued_object != output_counter)
{
ss_dfprintf(stderr, "\nOutput counter was %d, but dequeue gave %d.\n",
output_counter,
*(int *)entry.queued_object);
return 8;
}
output_counter++;
MXS_FREE(entry.queued_object);
}
else
{
if (input_counter != output_counter)
{
ss_dfprintf(stderr, "\nNULL from dequeue, but input counter %d and output counter %d.\n",
input_counter,
output_counter);
return 9;
}
emptied++;
}
}
}
ss_dfprintf(stderr, "Successfully ended test\n");
mxs_queue_free(queue);
return 0;
}
static void *
thread_test(void *arg)
{
int i;
QUEUE_ENTRY entry;
int emptied = 0;
int filled = 0;
for (i = 0; i < THREAD_TEST_COUNT; i++)
{
if (random_jkiss() % 2)
{
if (!mxs_enqueue(thread_queue, (void *)"Just for test"))
{
filled++;
}
}
else
{
if (!mxs_dequeue(thread_queue, &entry))
{
emptied++;
}
}
}
ss_dfprintf(stderr, "Queue was full %d times, empty %d times\n", filled, emptied);
return NULL;
}
static int
test2()
{
pthread_t tid[NUMBER_OF_THREADS];
int err, i, limit;
thread_queue = mxs_queue_alloc(TEST_QUEUE_SIZE, HEARTBEATS_TO_EXPIRE);
limit = NUMBER_OF_THREADS;
for (i = 0; i < limit; i++)
{
err = pthread_create(&tid[i], NULL, thread_test, NULL);
ss_info_dassert((0 == err), "Must create threads successfully");
}
for (i = 0; i < limit; i++)
{
err = pthread_join(tid[i], NULL);
ss_info_dassert((0 == err), "Must join threads successfully");
ss_dfprintf(stderr, "\nThread %d ended with debug check fail at %d.\n", i, debug_check_fail);
}
mxs_queue_free(thread_queue);
return debug_check_fail ? 1 : 0;
}
int main(int argc, char **argv)
{
int result = 0;
result += (test1() ? 1 : 0);
result += (test2() ? 1 : 0);
exit(result);
}

View File

@ -64,7 +64,6 @@ typedef struct
bool warn_set_standalone_master; /**< Log a warning when setting standalone master */
bool allow_external_slaves; /**< Whether to allow usage of external slave servers */
bool failover; /**< If master failover is enabled */
char* failover_script; /**< Script to call for performing master failover */
uint32_t failover_timeout; /**< Timeout in seconds for the master failover */
bool switchover; /**< If master switchover is enabled */
char* switchover_script; /**< Script to call for performing master switchover */

View File

@ -75,18 +75,15 @@ static void set_slave_heartbeat(MXS_MONITOR *, MXS_MONITORED_SERVER *);
static int add_slave_to_master(long *, int, long);
static bool isMySQLEvent(mxs_monitor_event_t event);
void check_maxscale_schema_replication(MXS_MONITOR *monitor);
static bool mon_process_failover(MYSQL_MONITOR* monitor,
const char* failover_script,
uint32_t failover_timeout);
static bool mon_process_failover(MYSQL_MONITOR* monitor, uint32_t failover_timeout);
static bool do_failover(MYSQL_MONITOR* mon);
static void update_gtid_slave_pos(MXS_MONITORED_SERVER *database, int64_t domain, MySqlServerInfo* info);
static bool update_gtid_slave_pos(MXS_MONITORED_SERVER *database, int64_t domain, MySqlServerInfo* info);
static bool update_replication_settings(MXS_MONITORED_SERVER *database, MySqlServerInfo* info);
static bool report_version_err = true;
static const char* hb_table_name = "maxscale_schema.replication_heartbeat";
static const char CN_FAILOVER[] = "failover";
static const char CN_FAILOVER_SCRIPT[] = "failover_script";
static const char CN_FAILOVER_TIMEOUT[] = "failover_timeout";
static const char CN_SWITCHOVER[] = "switchover";
static const char CN_SWITCHOVER_SCRIPT[] = "switchover_script";
@ -109,14 +106,6 @@ static const char CN_REPLICATION_PASSWORD[] = "replication_password";
/** Default master failure verification timeout */
#define DEFAULT_MASTER_FAILURE_TIMEOUT "10"
// TODO: Specify the real default failover script.
static const char DEFAULT_FAILOVER_SCRIPT[] =
"/usr/bin/echo INITIATOR=$INITIATOR "
"PARENT=$PARENT CHILDREN=$CHILDREN EVENT=$EVENT "
"CREDENTIALS=$CREDENTIALS NODELIST=$NODELIST "
"LIST=$LIST MASTERLIST=$MASTERLIST "
"SLAVELIST=$SLAVELIST SYNCEDLIST=$SYNCEDLIST";
// TODO: Specify the real default switchover script.
static const char DEFAULT_SWITCHOVER_SCRIPT[] =
"/usr/bin/echo CURRENT_MASTER=$CURRENT_MASTER NEW_MASTER=$NEW_MASTER "
@ -550,12 +539,6 @@ MXS_MODULE* MXS_CREATE_MODULE()
mxs_monitor_event_enum_values
},
{CN_FAILOVER, MXS_MODULE_PARAM_BOOL, "false"},
{
CN_FAILOVER_SCRIPT,
MXS_MODULE_PARAM_PATH,
NULL,
MXS_MODULE_OPT_PATH_X_OK
},
{CN_FAILOVER_TIMEOUT, MXS_MODULE_PARAM_COUNT, DEFAULT_FAILOVER_TIMEOUT},
{CN_SWITCHOVER, MXS_MODULE_PARAM_BOOL, "false"},
{
@ -623,6 +606,10 @@ public:
ss_dassert(found);
}
}
bool operator == (const Gtid& rhs) const
{
return domain == rhs.domain && server_id == rhs.server_id && sequence == rhs.sequence;
}
private:
void parse_triplet(const char* str)
{
@ -855,7 +842,6 @@ startMonitor(MXS_MONITOR *monitor, const MXS_CONFIG_PARAMETER* params)
handle->events = config_get_enum(params, "events", mxs_monitor_event_enum_values);
handle->allow_external_slaves = config_get_bool(params, "allow_external_slaves");
handle->failover = config_get_bool(params, CN_FAILOVER);
handle->failover_script = config_copy_string(params, CN_FAILOVER_SCRIPT);
handle->failover_timeout = config_get_integer(params, CN_FAILOVER_TIMEOUT);
handle->switchover = config_get_bool(params, CN_SWITCHOVER);
handle->switchover_script = config_copy_string(params, CN_SWITCHOVER_SCRIPT);
@ -1181,8 +1167,8 @@ static bool do_show_slave_status(MySqlServerInfo* serv_info, MXS_MONITORED_SERVE
{
const char* beats = mxs_mysql_get_value(result, row, "Slave_received_heartbeats");
const char* period = mxs_mysql_get_value(result, row, "Slave_heartbeat_period");
const char* gtid_io_pos = mxs_mysql_get_value(result, row, "Gtid_IO_Pos");
ss_dassert(beats && period);
const char* using_gtid = mxs_mysql_get_value(result, row, "Using_Gtid");
ss_dassert(beats && period && using_gtid);
int heartbeats = atoi(beats);
if (serv_info->slave_heartbeats < heartbeats)
@ -1191,17 +1177,12 @@ static bool do_show_slave_status(MySqlServerInfo* serv_info, MXS_MONITORED_SERVE
serv_info->slave_heartbeats = heartbeats;
serv_info->heartbeat_period = atof(period);
}
if (serv_info->slave_status.slave_sql_running && gtid_io_pos)
if (strcmp(using_gtid, "Slave_Pos") == 0)
{
Gtid io_pos = Gtid(gtid_io_pos);
serv_info->slave_status.gtid_io_pos = io_pos;
update_gtid_slave_pos(database, io_pos.domain, serv_info);
}
else
{
serv_info->slave_status.gtid_io_pos = Gtid();
serv_info->gtid_slave_pos = Gtid();
const char* gtid_io_pos = mxs_mysql_get_value(result, row, "Gtid_IO_Pos");
ss_dassert(gtid_io_pos);
serv_info->slave_status.gtid_io_pos = gtid_io_pos[0] != '\0' ?
Gtid(gtid_io_pos) : Gtid();
}
}
@ -1215,7 +1196,6 @@ static bool do_show_slave_status(MySqlServerInfo* serv_info, MXS_MONITORED_SERVE
/** Query returned no rows, replication is not configured */
serv_info->slave_configured = false;
serv_info->slave_heartbeats = 0;
serv_info->gtid_slave_pos = Gtid();
serv_info->slave_status = SlaveStatusInfo();
}
@ -1751,13 +1731,13 @@ void find_graph_cycles(MYSQL_MONITOR *handle, MXS_MONITORED_SERVER *database, in
/** We have at least one cycle in the graph */
if (graph[i].info->read_only)
{
monitor_set_pending_status(graph[i].db, SERVER_SLAVE);
monitor_set_pending_status(graph[i].db, SERVER_SLAVE | SERVER_STALE_SLAVE);
monitor_clear_pending_status(graph[i].db, SERVER_MASTER);
}
else
{
monitor_set_pending_status(graph[i].db, SERVER_MASTER);
monitor_clear_pending_status(graph[i].db, SERVER_SLAVE);
monitor_clear_pending_status(graph[i].db, SERVER_SLAVE | SERVER_STALE_SLAVE);
}
}
else if (handle->detectStaleMaster && cycle == 0 &&
@ -1777,13 +1757,13 @@ void find_graph_cycles(MYSQL_MONITOR *handle, MXS_MONITORED_SERVER *database, in
if (graph[i].info->read_only)
{
/** The master is in read-only mode, set it into Slave state */
monitor_set_pending_status(graph[i].db, SERVER_SLAVE);
monitor_set_pending_status(graph[i].db, SERVER_SLAVE | SERVER_STALE_SLAVE);
monitor_clear_pending_status(graph[i].db, SERVER_MASTER | SERVER_STALE_STATUS);
}
else
{
monitor_set_pending_status(graph[i].db, SERVER_MASTER | SERVER_STALE_STATUS);
monitor_clear_pending_status(graph[i].db, SERVER_SLAVE);
monitor_clear_pending_status(graph[i].db, SERVER_SLAVE | SERVER_STALE_SLAVE);
}
}
}
@ -2126,13 +2106,13 @@ monitorMain(void *arg)
/** Slave with a running master, assign stale slave candidacy */
if ((ptr->pending_status & bits) == bits)
{
ptr->pending_status |= SERVER_STALE_SLAVE;
monitor_set_pending_status(ptr, SERVER_STALE_SLAVE);
}
/** Server lost slave when a master is available, remove
* stale slave candidacy */
else if ((ptr->pending_status & bits) == SERVER_RUNNING)
{
ptr->pending_status &= ~SERVER_STALE_SLAVE;
monitor_clear_pending_status(ptr, SERVER_STALE_SLAVE);
}
}
/** If this server was a stale slave candidate, assign
@ -2145,11 +2125,11 @@ monitorMain(void *arg)
(SERVER_IS_MASTER(root_master->server) &&
(root_master->mon_prev_status & SERVER_MASTER) == 0)))
{
ptr->pending_status |= SERVER_SLAVE;
monitor_set_pending_status(ptr, SERVER_SLAVE);
}
else if (root_master == NULL && serv_info->slave_configured)
{
ptr->pending_status |= SERVER_SLAVE;
monitor_set_pending_status(ptr, SERVER_SLAVE);
}
}
@ -2173,6 +2153,12 @@ monitorMain(void *arg)
}
}
if (root_master)
{
// Clear slave and stale slave status bits from current master
monitor_clear_pending_status(root_master, SERVER_SLAVE | SERVER_STALE_SLAVE);
}
/**
* After updating the status of all servers, check if monitor events
* need to be launched.
@ -2181,13 +2167,6 @@ monitorMain(void *arg)
if (handle->failover)
{
const char* failover_script = handle->failover_script;
if (!failover_script)
{
failover_script = DEFAULT_FAILOVER_SCRIPT;
}
if (failover_not_possible(handle))
{
MXS_ERROR("Failover is not possible due to one or more problems in "
@ -2202,7 +2181,7 @@ monitorMain(void *arg)
{
MXS_INFO("Master failure not yet confirmed by slaves, delaying failover.");
}
else if (!mon_process_failover(handle, failover_script, handle->failover_timeout))
else if (!mon_process_failover(handle, handle->failover_timeout))
{
MXS_ALERT("Failed to perform failover, disabling failover functionality. "
"To enable failover functionality, manually set 'failover' to "
@ -2979,7 +2958,6 @@ void check_maxscale_schema_replication(MXS_MONITOR *monitor)
* This function should be called immediately after @c mon_process_state_changes.
*
* @param monitor Monitor whose cluster is processed
* @param failover_script The script to be used for performing the failover.
* @param failover_timeout Timeout in seconds for the failover
*
* @return True on success, false on error
@ -2987,7 +2965,7 @@ void check_maxscale_schema_replication(MXS_MONITOR *monitor)
* @todo Currently this only works with flat replication topologies and
* needs to be moved inside mysqlmon as it is MariaDB specific code.
*/
bool mon_process_failover(MYSQL_MONITOR* monitor, const char* failover_script, uint32_t failover_timeout)
bool mon_process_failover(MYSQL_MONITOR* monitor, uint32_t failover_timeout)
{
bool rval = true;
MXS_CONFIG* cnf = config_get_global_options();
@ -3065,7 +3043,9 @@ MXS_MONITORED_SERVER* failover_select_new_master(MYSQL_MONITOR* mon, ServerVecto
mon_server = mon_server->next)
{
MySqlServerInfo* cand_info = get_server_info(mon, mon_server);
if (cand_info->slave_status.slave_sql_running && update_replication_settings(mon_server, cand_info))
if (cand_info->slave_status.slave_sql_running &&
update_replication_settings(mon_server, cand_info) &&
update_gtid_slave_pos(mon_server, cand_info->slave_status.gtid_io_pos.domain, cand_info))
{
if (out_slaves)
{
@ -3149,15 +3129,20 @@ bool failover_wait_relay_log(MYSQL_MONITOR* mon, MXS_MONITORED_SERVER* new_maste
MySqlServerInfo* master_info = get_server_info(mon, new_master);
time_t begin = time(NULL);
bool query_ok = true;
bool io_pos_stable = true;
while (master_info->relay_log_events() > 0 &&
query_ok &&
io_pos_stable &&
difftime(time(NULL), begin) < mon->failover_timeout)
{
MXS_NOTICE("Failover: Relay log of server '%s' not yet empty, waiting to clear %" PRId64 " events.",
new_master->server->unique_name, master_info->relay_log_events());
thread_millisleep(1000); // Sleep for a while before querying server again.
// Todo: check server version before entering failover.
query_ok = do_show_slave_status(master_info, new_master, MYSQL_SERVER_VERSION_100);
Gtid old_gtid_io_pos = master_info->slave_status.gtid_io_pos;
query_ok = do_show_slave_status(master_info, new_master, MYSQL_SERVER_VERSION_100) &&
update_gtid_slave_pos(new_master, old_gtid_io_pos.domain, master_info);
io_pos_stable = (old_gtid_io_pos == master_info->slave_status.gtid_io_pos);
}
bool rval = false;
@ -3167,9 +3152,17 @@ bool failover_wait_relay_log(MYSQL_MONITOR* mon, MXS_MONITORED_SERVER* new_maste
}
else
{
MXS_ALERT("Failover: %s while waiting for server '%s' to process relay log.",
query_ok ? "Timeout" : "Status query error",
new_master->server->unique_name);
const char* reason = "Timeout";
if (!query_ok)
{
reason = "Query error";
}
else if (!io_pos_stable)
{
reason = "Old master sent new event(s)";
}
MXS_ERROR("Failover: %s while waiting for server '%s' to process relay log. Cancelling failover.",
reason, new_master->server->unique_name);
rval = false;
}
return rval;
@ -3349,12 +3342,15 @@ static bool update_replication_settings(MXS_MONITORED_SERVER *database, MySqlSer
* @param database The server to query.
* @param domain Which gtid domain should be saved.
* @param info Server info structure for saving result.
* @return True if successful
*/
static void update_gtid_slave_pos(MXS_MONITORED_SERVER *database, int64_t domain, MySqlServerInfo* info)
static bool update_gtid_slave_pos(MXS_MONITORED_SERVER *database, int64_t domain, MySqlServerInfo* info)
{
StringVector row;
if (query_one_row(database, "SELECT @@gtid_slave_pos;", 1, &row))
bool rval = query_one_row(database, "SELECT @@gtid_slave_pos;", 1, &row);
if (rval)
{
info->gtid_slave_pos = Gtid(row.front().c_str(), domain);
}
return rval;
}

View File

@ -770,6 +770,8 @@ createInstance(SERVICE *service, char **options)
{
/* Force GTID slave request handling */
inst->mariadb10_gtid = true;
/* Force transaction safety */
inst->trx_safe = true;
/* Force binlog storage as tree */
inst->storage_type = BLR_BINLOG_STORAGE_TREE;
}
@ -782,18 +784,8 @@ createInstance(SERVICE *service, char **options)
"'tree' mode using GTID domain_id and server_id");
/* Enable MariaDB the GTID maps store */
if (inst->mariadb10_compat &&
inst->mariadb10_gtid)
if (inst->mariadb10_compat)
{
if (!inst->trx_safe)
{
MXS_ERROR("MariaDB GTID can be enabled only"
" with Transaction Safety feature."
" Please enable it with option 'transaction_safety = on'");
free_instance(inst);
return NULL;
}
/* Create/Open R/W GTID sqlite3 storage */
if (!blr_open_gtid_maps_storage(inst))
{