Merge branch '2.2' into develop

This commit is contained in:
Markus Mäkelä
2018-01-03 14:07:09 +02:00
29 changed files with 667 additions and 57 deletions

View File

@ -98,7 +98,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
MXS_AUTHENTICATOR_VERSION,
"The MySQL client to MaxScale authenticator implementation",
"V1.1.0",
MXS_NO_MODULE_CAPABILITIES,
ACAP_TYPE_ASYNC,
&MyObject,
NULL, /* Process init. */
NULL, /* Process finish. */

View File

@ -285,9 +285,8 @@ newSession(MXS_FILTER *instance, MXS_SESSION *session)
MXS_FREE(my_session);
return NULL;
}
sprintf(my_session->filename, "%s.%d", my_instance->filebase,
my_instance->sessions);
atomic_add(&my_instance->sessions, 1);
sprintf(my_session->filename, "%s.%lu", my_instance->filebase, session->ses_id);
my_session->top = (TOPNQ **) MXS_CALLOC(my_instance->topN + 1, sizeof(TOPNQ *));
MXS_ABORT_IF_NULL(my_session->top);
for (i = 0; i < my_instance->topN; i++)

View File

@ -15,7 +15,7 @@
* @file mysql_mon.c - A MySQL replication cluster monitor
*/
#define MXS_MODULE_NAME "mysqlmon"
#define MXS_MODULE_NAME "mariadbmon"
#include "../mysqlmon.h"
#include <string>
@ -3360,7 +3360,7 @@ bool failover_wait_relay_log(MYSQL_MONITOR* mon, MXS_MONITORED_SERVER* new_maste
* @param err_out json object for error printing. Can be NULL.
* @return True if successful
*/
bool promote_new_master(MYSQL_MONITOR* mon, MXS_MONITORED_SERVER* new_master, json_t** err_out)
bool promote_new_master(MXS_MONITORED_SERVER* new_master, json_t** err_out)
{
bool success = false;
MXS_NOTICE("Promoting server '%s' to master.", new_master->server->unique_name);
@ -3478,7 +3478,7 @@ static bool do_failover(MYSQL_MONITOR* mon, json_t** err_out)
// Step 2: Wait until relay log consumed.
if (failover_wait_relay_log(mon, new_master, err_out) &&
// Step 3: Stop and reset slave, set read-only to 0.
promote_new_master(mon, new_master, err_out))
promote_new_master(new_master, err_out))
{
// Step 4: Redirect slaves.
int redirects = redirect_slaves(mon, slaves, new_master);
@ -3714,6 +3714,48 @@ static bool switchover_wait_slave_catchup(MXS_MONITORED_SERVER* slave, const Gti
return gtid_reached;
}
/**
* Wait until slave replication catches up with the master gtid for all slaves in the vector.
*
* @param slave Slaves to wait on
* @param gtid Which gtid must be reached
* @param total_timeout Maximum wait time in seconds
* @param read_timeout The value of read_timeout for the connection
* @param err_out json object for error printing. Can be NULL.
* @return True, if target gtid was reached within allotted time for all servers
*/
static bool switchover_wait_slaves_catchup(const ServerVector& slaves, const Gtid& gtid,
int total_timeout, int read_timeout,
json_t** err_out)
{
bool success = true;
int seconds_remaining = total_timeout;
for (ServerVector::const_iterator iter = slaves.begin();
iter != slaves.end() && success;
iter++)
{
if (seconds_remaining < 0)
{
success = false;
}
else
{
time_t begin = time(NULL);
MXS_MONITORED_SERVER* slave = *iter;
if (switchover_wait_slave_catchup(slave, gtid, seconds_remaining, read_timeout, err_out))
{
seconds_remaining -= difftime(time(NULL), begin);
}
else
{
success = false;
}
}
}
return success;
}
/**
* Starts a new slave connection on a server. Should be used on a demoted master server.
*
@ -3818,28 +3860,48 @@ static bool do_switchover(MYSQL_MONITOR* mon, MXS_MONITORED_SERVER* current_mast
bool rval = false;
MySqlServerInfo* curr_master_info = get_server_info(mon, demotion_target);
// Step 2: Set read-only to 1, flush logs.
if (switchover_demote_master(mon, demotion_target, curr_master_info, err_out) &&
// Step 3: Wait for the selected slave to catch up with master.
switchover_wait_slave_catchup(promotion_target, curr_master_info->gtid_binlog_pos,
mon->switchover_timeout, mon->monitor->read_timeout, err_out) &&
// Step 4: Stop and reset slave, set read-only to 0.
promote_new_master(mon, promotion_target, err_out))
if (switchover_demote_master(mon, demotion_target, curr_master_info, err_out))
{
// Step 5: Redirect slaves.
int redirects = redirect_slaves(mon, slaves, promotion_target);
// Step 6: Set the old master to replicate from the new.
bool start_ok = switchover_start_slave(mon, demotion_target, promotion_target);
rval = slaves.empty() ? start_ok : start_ok || redirects > 0;
if (rval == false)
// Step 3a: Wait for the selected slave to catch up with master.
if (switchover_wait_slave_catchup(promotion_target, curr_master_info->gtid_binlog_pos,
mon->switchover_timeout, mon->monitor->read_timeout, err_out) &&
// Step 3b: Wait for other slaves to catch up with master.
switchover_wait_slaves_catchup(slaves, curr_master_info->gtid_binlog_pos,
mon->switchover_timeout, mon->monitor->read_timeout, err_out) &&
// Step 4: Stop and reset slave, set read-only to 0.
promote_new_master(promotion_target, err_out))
{
// This is a special case. Individual server errors have already been printed to the log.
// For JSON, gather the errors again.
const char MSG[] = "Could not redirect any slaves to the new master.";
MXS_ERROR(MSG);
if (err_out)
// Step 5: Redirect slaves.
int redirects = redirect_slaves(mon, slaves, promotion_target);
// Step 6: Set the old master to replicate from the new.
bool start_ok = switchover_start_slave(mon, demotion_target, promotion_target);
rval = slaves.empty() ? start_ok : start_ok || redirects > 0;
if (rval == false)
{
string combined_error = get_connection_errors(slaves, demotion_target);
*err_out = mxs_json_error_append(*err_out, "%s Errors: %s.", MSG, combined_error.c_str());
// This is a special case. Individual server errors have already been printed to the log.
// For JSON, gather the errors again.
const char MSG[] = "Could not redirect any slaves to the new master.";
MXS_ERROR(MSG);
if (err_out)
{
string combined_error = get_connection_errors(slaves, demotion_target);
*err_out = mxs_json_error_append(*err_out, "%s Errors: %s.", MSG, combined_error.c_str());
}
}
}
else
{
// Step 3a, 3b or 4 failed, try to undo step 2.
const char QUERY_UNDO[] = "SET GLOBAL read_only=0;";
if (mxs_mysql_query(demotion_target->con, QUERY_UNDO) == 0)
{
PRINT_MXS_JSON_ERROR(err_out, "read_only disabled on server %s.",
demotion_target->server->unique_name);
}
else
{
PRINT_MXS_JSON_ERROR(err_out, "Could not disable read_only on server %s: '%s'.",
demotion_target->server->unique_name, mysql_error(demotion_target->con));
}
}
}

View File

@ -26,9 +26,10 @@
#include <string.h>
#include <sys/stat.h>
#include <time.h>
#include <binlog_common.h>
#include <glob.h>
#include <ini.h>
#include <sys/stat.h>
#include <avro/errors.h>
#include <maxscale/alloc.h>
#include <maxscale/atomic.h>
#include <maxscale/dcb.h>
@ -43,6 +44,7 @@
#include <maxscale/service.h>
#include <maxscale/spinlock.h>
#include <maxscale/utils.h>
#include <binlog_common.h>
#ifndef BINLOG_NAMEFMT
#define BINLOG_NAMEFMT "%s.%06d"
@ -111,6 +113,7 @@ bool avro_handle_convert(const MODULECMD_ARG *args, json_t** output)
return rval;
}
static const MXS_ENUM_VALUE codec_values[] =
{
{"null", MXS_AVRO_CODEC_NULL},
@ -120,6 +123,70 @@ static const MXS_ENUM_VALUE codec_values[] =
{NULL}
};
static bool do_unlink(const char* format, ...)
{
va_list args;
va_start(args, format);
char filename[PATH_MAX + 1];
vsnprintf(filename, sizeof(filename), format, args);
va_end(args);
int rc = unlink(filename);
return rc == 0 || rc == ENOENT;
}
static bool do_unlink_with_pattern(const char* format, ...)
{
bool rval = true;
va_list args;
va_start(args, format);
char filename[PATH_MAX + 1];
vsnprintf(filename, sizeof(filename), format, args);
va_end(args);
glob_t g;
int rc = glob(filename, 0, NULL, &g);
if (rc == 0)
{
for (size_t i = 0; i < g.gl_pathc; i++)
{
if (!do_unlink("%s", g.gl_pathv[i]))
{
rval = false;
}
}
}
else if (rc != GLOB_NOMATCH)
{
modulecmd_set_error("Failed to search '%s': %d, %s",
filename, errno, mxs_strerror(errno));
rval = false;
}
globfree(&g);
return rval;
}
static bool avro_handle_purge(const MODULECMD_ARG *args, json_t** output)
{
AVRO_INSTANCE* inst = (AVRO_INSTANCE*)args->argv[0].value.service->router_instance;
// First stop the conversion service
conversion_task_ctl(inst, false);
// Then delete the files
return do_unlink("%s/%s", inst->avrodir, AVRO_PROGRESS_FILE) && // State file
do_unlink("/%s/%s", inst->avrodir, avro_index_name) && // Index database
do_unlink_with_pattern("/%s/*.avro", inst->avrodir) && // .avro files
do_unlink_with_pattern("/%s/*.avsc", inst->avrodir); // .avsc files
}
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
@ -133,15 +200,27 @@ MXS_MODULE* MXS_CREATE_MODULE()
spinlock_init(&instlock);
instances = NULL;
static modulecmd_arg_type_t args[] =
static modulecmd_arg_type_t args_convert[] =
{
{ MODULECMD_ARG_SERVICE | MODULECMD_ARG_NAME_MATCHES_DOMAIN, "The avrorouter service" },
{ MODULECMD_ARG_STRING, "Action, whether to 'start' or 'stop' the conversion process" }
};
modulecmd_register_command(MXS_MODULE_NAME, "convert", MODULECMD_TYPE_ACTIVE,
avro_handle_convert, 2, args,
avro_handle_convert, 2, args_convert,
"Start or stop the binlog to avro conversion process");
static modulecmd_arg_type_t args_purge[] =
{
{
MODULECMD_ARG_SERVICE | MODULECMD_ARG_NAME_MATCHES_DOMAIN,
"The avrorouter service to purge (NOTE: THIS REMOVES ALL CONVERTED FILES)"
}
};
modulecmd_register_command(MXS_MODULE_NAME, "purge", MODULECMD_TYPE_ACTIVE,
avro_handle_purge, 1, args_purge,
"Purge created Avro files and reset conversion state. "
"NOTE: MaxScale must be restarted after this call.");
static MXS_ROUTER_OBJECT MyObject =
{
createInstance,

View File

@ -123,10 +123,13 @@ char* json_new_schema_from_table(TABLE_MAP *map)
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:o}", "name", avro_event_type,
"type", event_types));
for (uint64_t i = 0; i < map->columns; i++)
for (uint64_t i = 0; i < map->columns && i < create->columns; i++)
{
ss_info_dassert(create->column_names[i] && *create->column_names[i],
"Column name should not be empty or NULL");
ss_info_dassert(create->column_types[i] && *create->column_types[i],
"Column type should not be empty or NULL");
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}",
"name", create->column_names[i],
"type", column_type_to_avro_type(map->column_types[i]),

View File

@ -40,6 +40,7 @@
#include <maxscale/log_manager.h>
#include <maxscale/paths.h>
#include <maxscale/alloc.h>
#include "../../../../core/internal/modules.h"
#include <maxscale/protocol/mysql.h>
#include <ini.h>
@ -95,11 +96,12 @@ int main(int argc, char **argv)
mxs_log_set_priority_enabled(LOG_DEBUG, false);
mxs_log_set_priority_enabled(LOG_INFO, false);
char *lib_dir = getenv("LD_LIBRARY_PATH");
if (lib_dir)
{
set_libdir(MXS_STRDUP_A(lib_dir));
}
set_libdir(MXS_STRDUP_A(".."));
load_module("binlogrouter", MODULE_ROUTER);
set_libdir(MXS_STRDUP_A("../../../protocol/MySQL/MySQLBackend/"));
load_module("MySQLBackend", MODULE_PROTOCOL);
set_libdir(MXS_STRDUP_A("../../../authenticator/MySQLBackendAuth/"));
load_module("MySQLBackendAuth", MODULE_AUTHENTICATOR);
if ((service = service_alloc("test_service", "binlogrouter")) == NULL)
{

View File

@ -507,7 +507,7 @@ closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session)
/** Log routing failure due to closed session */
static void log_closed_session(mxs_mysql_cmd_t mysql_command, bool is_closed,
SERVER_REF *ref)
SERVER_REF *ref, bool valid)
{
char msg[MAX_SERVER_ADDRESS_LEN + 200] = ""; // Extra space for message
@ -523,11 +523,44 @@ static void log_closed_session(mxs_mysql_cmd_t mysql_command, bool is_closed,
{
sprintf(msg, "Server '%s' is in maintenance.", ref->server->unique_name);
}
else if (!valid)
{
sprintf(msg, "Server '%s' no longer qualifies as a target server.",
ref->server->unique_name);
}
MXS_ERROR("Failed to route MySQL command %d to backend server. %s",
mysql_command, msg);
}
/**
* Check if the server we're connected to is still valid
*
* @param inst Router instance
* @param router_cli_ses Router session
*
* @return True if the backend connection is still valid
*/
static inline bool connection_is_valid(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* router_cli_ses)
{
bool rval = false;
if (SERVER_IS_RUNNING(router_cli_ses->backend->server) &&
(router_cli_ses->backend->server->status & inst->bitmask & inst->bitvalue))
{
if (inst->bitvalue & SERVER_MASTER)
{
rval = router_cli_ses->backend == get_root_master(inst->service->dbref);
}
else
{
rval = true;
}
}
return rval;
}
/**
* We have data from the client, we must route it to the backend.
* This is simply a case of sending it to the connection that was
@ -571,10 +604,12 @@ routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queu
rses_end_locked_router_action(router_cli_ses);
}
bool valid;
if (rses_is_closed || backend_dcb == NULL ||
!SERVER_IS_RUNNING(router_cli_ses->backend->server))
(valid = !connection_is_valid(inst, router_cli_ses)))
{
log_closed_session(mysql_command, rses_is_closed, router_cli_ses->backend);
log_closed_session(mysql_command, rses_is_closed, router_cli_ses->backend, valid);
gwbuf_free(queue);
goto return_rc;