Merge branch '2.1' into 2.2
This commit is contained in:
@ -29,7 +29,6 @@
|
||||
#include <sys/time.h>
|
||||
#include "internal/skygw_utils.h"
|
||||
#include <maxscale/atomic.h>
|
||||
#include <maxscale/random_jkiss.h>
|
||||
#include <pcre2.h>
|
||||
|
||||
#if !defined(PATH_MAX)
|
||||
@ -460,7 +459,7 @@ void acquire_lock(int* l)
|
||||
misscount += 1;
|
||||
if (misscount > 10)
|
||||
{
|
||||
ts1.tv_nsec = (random_jkiss() % misscount) * 1000000;
|
||||
ts1.tv_nsec = misscount * 1000000;
|
||||
nanosleep(&ts1, NULL);
|
||||
}
|
||||
}
|
||||
|
@ -26,9 +26,10 @@
|
||||
#include <string.h>
|
||||
#include <sys/stat.h>
|
||||
#include <time.h>
|
||||
|
||||
#include <binlog_common.h>
|
||||
|
||||
#include <glob.h>
|
||||
#include <ini.h>
|
||||
#include <sys/stat.h>
|
||||
#include <avro/errors.h>
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/atomic.h>
|
||||
#include <maxscale/dcb.h>
|
||||
@ -43,6 +44,7 @@
|
||||
#include <maxscale/service.h>
|
||||
#include <maxscale/spinlock.h>
|
||||
#include <maxscale/utils.h>
|
||||
#include <binlog_common.h>
|
||||
|
||||
#ifndef BINLOG_NAMEFMT
|
||||
#define BINLOG_NAMEFMT "%s.%06d"
|
||||
@ -111,6 +113,7 @@ bool avro_handle_convert(const MODULECMD_ARG *args, json_t** output)
|
||||
return rval;
|
||||
}
|
||||
|
||||
|
||||
static const MXS_ENUM_VALUE codec_values[] =
|
||||
{
|
||||
{"null", MXS_AVRO_CODEC_NULL},
|
||||
@ -120,6 +123,70 @@ static const MXS_ENUM_VALUE codec_values[] =
|
||||
{NULL}
|
||||
};
|
||||
|
||||
static bool do_unlink(const char* format, ...)
|
||||
{
|
||||
va_list args;
|
||||
va_start(args, format);
|
||||
|
||||
char filename[PATH_MAX + 1];
|
||||
vsnprintf(filename, sizeof(filename), format, args);
|
||||
|
||||
va_end(args);
|
||||
|
||||
int rc = unlink(filename);
|
||||
return rc == 0 || rc == ENOENT;
|
||||
}
|
||||
|
||||
static bool do_unlink_with_pattern(const char* format, ...)
|
||||
{
|
||||
bool rval = true;
|
||||
va_list args;
|
||||
va_start(args, format);
|
||||
|
||||
char filename[PATH_MAX + 1];
|
||||
vsnprintf(filename, sizeof(filename), format, args);
|
||||
|
||||
va_end(args);
|
||||
|
||||
glob_t g;
|
||||
int rc = glob(filename, 0, NULL, &g);
|
||||
|
||||
if (rc == 0)
|
||||
{
|
||||
for (size_t i = 0; i < g.gl_pathc; i++)
|
||||
{
|
||||
if (!do_unlink("%s", g.gl_pathv[i]))
|
||||
{
|
||||
rval = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (rc != GLOB_NOMATCH)
|
||||
{
|
||||
modulecmd_set_error("Failed to search '%s': %d, %s",
|
||||
filename, errno, mxs_strerror(errno));
|
||||
rval = false;
|
||||
}
|
||||
|
||||
globfree(&g);
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
static bool avro_handle_purge(const MODULECMD_ARG *args, json_t** output)
|
||||
{
|
||||
AVRO_INSTANCE* inst = (AVRO_INSTANCE*)args->argv[0].value.service->router_instance;
|
||||
|
||||
// First stop the conversion service
|
||||
conversion_task_ctl(inst, false);
|
||||
|
||||
// Then delete the files
|
||||
return do_unlink("%s/%s", inst->avrodir, AVRO_PROGRESS_FILE) && // State file
|
||||
do_unlink("/%s/%s", inst->avrodir, avro_index_name) && // Index database
|
||||
do_unlink_with_pattern("/%s/*.avro", inst->avrodir) && // .avro files
|
||||
do_unlink_with_pattern("/%s/*.avsc", inst->avrodir); // .avsc files
|
||||
}
|
||||
|
||||
/**
|
||||
* The module entry point routine. It is this routine that
|
||||
* must populate the structure that is referred to as the
|
||||
@ -133,15 +200,27 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
||||
spinlock_init(&instlock);
|
||||
instances = NULL;
|
||||
|
||||
static modulecmd_arg_type_t args[] =
|
||||
static modulecmd_arg_type_t args_convert[] =
|
||||
{
|
||||
{ MODULECMD_ARG_SERVICE | MODULECMD_ARG_NAME_MATCHES_DOMAIN, "The avrorouter service" },
|
||||
{ MODULECMD_ARG_STRING, "Action, whether to 'start' or 'stop' the conversion process" }
|
||||
};
|
||||
modulecmd_register_command(MXS_MODULE_NAME, "convert", MODULECMD_TYPE_ACTIVE,
|
||||
avro_handle_convert, 2, args,
|
||||
avro_handle_convert, 2, args_convert,
|
||||
"Start or stop the binlog to avro conversion process");
|
||||
|
||||
static modulecmd_arg_type_t args_purge[] =
|
||||
{
|
||||
{
|
||||
MODULECMD_ARG_SERVICE | MODULECMD_ARG_NAME_MATCHES_DOMAIN,
|
||||
"The avrorouter service to purge (NOTE: THIS REMOVES ALL CONVERTED FILES)"
|
||||
}
|
||||
};
|
||||
modulecmd_register_command(MXS_MODULE_NAME, "purge", MODULECMD_TYPE_ACTIVE,
|
||||
avro_handle_purge, 1, args_purge,
|
||||
"Purge created Avro files and reset conversion state. "
|
||||
"NOTE: MaxScale must be restarted after this call.");
|
||||
|
||||
static MXS_ROUTER_OBJECT MyObject =
|
||||
{
|
||||
createInstance,
|
||||
|
@ -123,10 +123,13 @@ char* json_new_schema_from_table(TABLE_MAP *map)
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:o}", "name", avro_event_type,
|
||||
"type", event_types));
|
||||
|
||||
for (uint64_t i = 0; i < map->columns; i++)
|
||||
for (uint64_t i = 0; i < map->columns && i < create->columns; i++)
|
||||
{
|
||||
ss_info_dassert(create->column_names[i] && *create->column_names[i],
|
||||
"Column name should not be empty or NULL");
|
||||
ss_info_dassert(create->column_types[i] && *create->column_types[i],
|
||||
"Column type should not be empty or NULL");
|
||||
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}",
|
||||
"name", create->column_names[i],
|
||||
"type", column_type_to_avro_type(map->column_types[i]),
|
||||
|
@ -507,7 +507,7 @@ closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session)
|
||||
|
||||
/** Log routing failure due to closed session */
|
||||
static void log_closed_session(mxs_mysql_cmd_t mysql_command, bool is_closed,
|
||||
SERVER_REF *ref)
|
||||
SERVER_REF *ref, bool valid)
|
||||
{
|
||||
char msg[MAX_SERVER_ADDRESS_LEN + 200] = ""; // Extra space for message
|
||||
|
||||
@ -523,11 +523,44 @@ static void log_closed_session(mxs_mysql_cmd_t mysql_command, bool is_closed,
|
||||
{
|
||||
sprintf(msg, "Server '%s' is in maintenance.", ref->server->unique_name);
|
||||
}
|
||||
else if (!valid)
|
||||
{
|
||||
sprintf(msg, "Server '%s' no longer qualifies as a target server.",
|
||||
ref->server->unique_name);
|
||||
}
|
||||
|
||||
MXS_ERROR("Failed to route MySQL command %d to backend server. %s",
|
||||
mysql_command, msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the server we're connected to is still valid
|
||||
*
|
||||
* @param inst Router instance
|
||||
* @param router_cli_ses Router session
|
||||
*
|
||||
* @return True if the backend connection is still valid
|
||||
*/
|
||||
static inline bool connection_is_valid(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* router_cli_ses)
|
||||
{
|
||||
bool rval = false;
|
||||
|
||||
if (SERVER_IS_RUNNING(router_cli_ses->backend->server) &&
|
||||
(router_cli_ses->backend->server->status & inst->bitmask & inst->bitvalue))
|
||||
{
|
||||
if (inst->bitvalue & SERVER_MASTER)
|
||||
{
|
||||
rval = router_cli_ses->backend == get_root_master(inst->service->dbref);
|
||||
}
|
||||
else
|
||||
{
|
||||
rval = true;
|
||||
}
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* We have data from the client, we must route it to the backend.
|
||||
* This is simply a case of sending it to the connection that was
|
||||
@ -571,10 +604,12 @@ routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queu
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
}
|
||||
|
||||
bool valid;
|
||||
|
||||
if (rses_is_closed || backend_dcb == NULL ||
|
||||
!SERVER_IS_RUNNING(router_cli_ses->backend->server))
|
||||
(valid = !connection_is_valid(inst, router_cli_ses)))
|
||||
{
|
||||
log_closed_session(mysql_command, rses_is_closed, router_cli_ses->backend);
|
||||
log_closed_session(mysql_command, rses_is_closed, router_cli_ses->backend, valid);
|
||||
gwbuf_free(queue);
|
||||
goto return_rc;
|
||||
|
||||
|
Reference in New Issue
Block a user