Merge branch '2.1' into 2.2

This commit is contained in:
Markus Mäkelä
2018-01-16 09:47:49 +02:00
15 changed files with 250 additions and 57 deletions

View File

@ -23,6 +23,7 @@
*/
#include "avrorouter.h"
#include <maxscale/query_classifier.h>
#include <binlog_common.h>
#include <blr_constants.h>
@ -1038,6 +1039,25 @@ void handle_query_event(AVRO_INSTANCE *router, REP_HEADER *hdr, int *pending_tra
len = tmpsz;
unify_whitespace(sql, len);
static bool warn_not_row_format = true;
if (warn_not_row_format)
{
GWBUF* buffer = gwbuf_alloc(len + 5);
gw_mysql_set_byte3(GWBUF_DATA(buffer), len + 1);
GWBUF_DATA(buffer)[4] = 0x03;
memcpy(GWBUF_DATA(buffer) + 5, sql, len);
qc_query_op_t op = qc_get_operation(buffer);
gwbuf_free(buffer);
if (op == QUERY_OP_UPDATE || op == QUERY_OP_INSERT || op == QUERY_OP_DELETE)
{
MXS_WARNING("Possible STATEMENT or MIXED format binary log. Check that "
"'binlog_format' is set to ROW on the master.");
warn_not_row_format = false;
}
}
if (is_create_table_statement(router, sql, len))
{
TABLE_CREATE *created = NULL;

View File

@ -17,6 +17,7 @@
#include <jansson.h>
#include <maxscale/alloc.h>
#include <strings.h>
#include <signal.h>
#include <maxscale/utils.h>
#define WRITE_EVENT 0
@ -107,6 +108,15 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
TABLE_MAP *old = hashtable_fetch(router->table_maps, table_ident);
TABLE_MAP *map = table_map_alloc(ptr, ev_len, create);
MXS_ABORT_IF_NULL(map); // Fatal error at this point
if (old && old->id == map->id && old->version == map->version &&
strcmp(old->table, map->table) == 0 &&
strcmp(old->database, map->database) == 0)
{
table_map_free(map);
return true;
}
char* json_schema = json_new_schema_from_table(map);
if (json_schema)
@ -487,6 +497,19 @@ int get_metadata_len(uint8_t type)
}
}
// Make sure that both `i` and `trace` are defined before using this macro
#define check_overflow(t) do \
{ \
if (!(t)) \
{ \
for (long x = 0; x < i;x++) \
{ \
MXS_ALERT("%s", trace[x]); \
} \
raise(SIGABRT); \
} \
}while(false)
/**
* @brief Extract the values from a single row in a row event
*
@ -503,7 +526,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
{
int npresent = 0;
avro_value_t field;
long ncolumns = map->columns;
long ncolumns = MXS_MIN(map->columns, create->columns);
uint8_t *metadata = map->column_metadata;
size_t metadata_offset = 0;
@ -516,7 +539,10 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
ptr += (ncolumns + 7) / 8;
ss_dassert(ptr < end);
for (long i = 0; i < map->columns && i < create->columns && npresent < ncolumns; i++)
char trace[ncolumns][768];
memset(trace, 0, sizeof(trace));
for (long i = 0; i < ncolumns && npresent < ncolumns; i++)
{
ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL);
ss_dassert(rc == 0);
@ -526,7 +552,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
npresent++;
if (bit_is_set(null_bitmap, ncolumns, i))
{
MXS_INFO("[%ld] NULL", i);
sprintf(trace[i], "[%ld] NULL", i);
if (column_is_blob(map->column_types[i]))
{
uint8_t nullvalue = 0;
@ -548,9 +574,9 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
char strval[bytes * 2 + 1];
gw_bin2hex(strval, val, bytes);
avro_value_set_string(&field, strval);
MXS_INFO("[%ld] ENUM: %lu bytes", i, bytes);
sprintf(trace[i], "[%ld] ENUM: %lu bytes", i, bytes);
ptr += bytes;
ss_dassert(ptr < end);
check_overflow(ptr < end);
}
else
{
@ -580,13 +606,13 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
bytes = *ptr++;
}
MXS_INFO("[%ld] CHAR: field: %d bytes, data: %d bytes", i, field_length, bytes);
sprintf(trace[i], "[%ld] CHAR: field: %d bytes, data: %d bytes", i, field_length, bytes);
char str[bytes + 1];
memcpy(str, ptr, bytes);
str[bytes] = '\0';
avro_value_set_string(&field, str);
ptr += bytes;
ss_dassert(ptr < end);
check_overflow(ptr < end);
}
}
else if (column_is_bit(map->column_types[i]))
@ -603,17 +629,17 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
MXS_WARNING("BIT is not currently supported, values are stored as 0.");
}
avro_value_set_int(&field, value);
MXS_INFO("[%ld] BIT", i);
sprintf(trace[i], "[%ld] BIT", i);
ptr += bytes;
ss_dassert(ptr < end);
check_overflow(ptr < end);
}
else if (column_is_decimal(map->column_types[i]))
{
double f_value = 0.0;
ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value);
avro_value_set_double(&field, f_value);
MXS_INFO("[%ld] DOUBLE", i);
ss_dassert(ptr < end);
sprintf(trace[i], "[%ld] DECIMAL", i);
check_overflow(ptr < end);
}
else if (column_is_variable_string(map->column_types[i]))
{
@ -630,13 +656,13 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
ptr++;
}
MXS_INFO("[%ld] VARCHAR: field: %d bytes, data: %lu bytes", i, bytes, sz);
sprintf(trace[i], "[%ld] VARCHAR: field: %d bytes, data: %lu bytes", i, bytes, sz);
char buf[sz + 1];
memcpy(buf, ptr, sz);
buf[sz] = '\0';
ptr += sz;
avro_value_set_string(&field, buf);
ss_dassert(ptr < end);
check_overflow(ptr < end);
}
else if (column_is_blob(map->column_types[i]))
{
@ -644,7 +670,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
uint64_t len = 0;
memcpy(&len, ptr, bytes);
ptr += bytes;
MXS_INFO("[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len);
sprintf(trace[i], "[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len);
if (len)
{
avro_value_set_bytes(&field, ptr, len);
@ -655,7 +681,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
uint8_t nullvalue = 0;
avro_value_set_bytes(&field, &nullvalue, 1);
}
ss_dassert(ptr < end);
check_overflow(ptr < end);
}
else if (column_is_temporal(map->column_types[i]))
{
@ -666,8 +692,8 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
create->column_lengths[i], &tm);
format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm);
avro_value_set_string(&field, buf);
MXS_INFO("[%ld] %s: %s", i, column_type_to_string(map->column_types[i]), buf);
ss_dassert(ptr < end);
sprintf(trace[i], "[%ld] %s: %s", i, column_type_to_string(map->column_types[i]), buf);
check_overflow(ptr < end);
}
/** All numeric types (INT, LONG, FLOAT etc.) */
else
@ -677,11 +703,18 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
ptr += unpack_numeric_field(ptr, map->column_types[i],
&metadata[metadata_offset], lval);
set_numeric_field_value(&field, map->column_types[i], &metadata[metadata_offset], lval);
ss_dassert(ptr < end);
sprintf(trace[i], "[%ld] %s", i, column_type_to_string(map->column_types[i]));
check_overflow(ptr < end);
}
ss_dassert(metadata_offset <= map->column_metadata_size);
metadata_offset += get_metadata_len(map->column_types[i]);
}
else
{
sprintf(trace[i], "[%ld] %s: Not present", i, column_type_to_string(map->column_types[i]));
}
MXS_INFO("%s", trace[i]);
}
return ptr;

View File

@ -311,10 +311,6 @@ void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map)
}
}
}
else
{
MXS_NOTICE("Schema version %d already exists: %s", map->version, filepath);
}
}
/**
@ -421,28 +417,33 @@ static bool get_database_name(const char* sql, char* dest)
if (ptr)
{
ptr--;
while (*ptr == '`' || isspace(*ptr))
while (ptr >= sql && (*ptr == '`' || isspace(*ptr)))
{
ptr--;
}
while (*ptr != '`' && *ptr != '.' && !isspace(*ptr))
while (ptr >= sql && *ptr != '`' && *ptr != '.' && !isspace(*ptr))
{
ptr--;
}
if (*ptr == '.')
while (ptr >= sql && (*ptr == '`' || isspace(*ptr)))
{
ptr--;
}
if (ptr >= sql && *ptr == '.')
{
// The query defines an explicit database
while (*ptr == '`' || *ptr == '.' || isspace(*ptr))
while (ptr >= sql && (*ptr == '`' || *ptr == '.' || isspace(*ptr)))
{
ptr--;
}
const char* end = ptr + 1;
while (*ptr != '`' && *ptr != '.' && !isspace(*ptr))
while (ptr >= sql && *ptr != '`' && *ptr != '.' && !isspace(*ptr))
{
ptr--;
}
@ -702,6 +703,21 @@ TABLE_CREATE* table_create_from_schema(const char* file, const char* db,
return newtable;
}
int resolve_table_version(const char* db, const char* table)
{
int version = 0;
char buf[PATH_MAX + 1];
do
{
version++;
snprintf(buf, sizeof(buf), "%s.%s.%06d.avsc", db, table, version);
}
while (access(buf, F_OK) == 0);
return version;
}
/**
* @brief Handle a query event which contains a CREATE TABLE statement
* @param sql Query SQL
@ -757,7 +773,7 @@ TABLE_CREATE* table_create_alloc(const char* sql, int len, const char* db)
{
if ((rval = MXS_MALLOC(sizeof(TABLE_CREATE))))
{
rval->version = 1;
rval->version = resolve_table_version(db, table);
rval->was_used = false;
rval->column_names = names;
rval->column_lengths = lengths;
@ -1139,7 +1155,7 @@ void read_alter_identifier(const char *sql, const char *end, char *dest, int siz
int len = 0;
const char *tok = get_tok(sql, &len, end); // ALTER
if (tok && (tok = get_tok(tok + len, &len, end)) // TABLE
&& (tok = get_tok(tok + len, &len, end))) // Table identifier
&& (tok = get_tok(tok + len, &len, end))) // Table identifier
{
snprintf(dest, size, "%.*s", len, tok);
remove_backticks(dest);
@ -1445,6 +1461,8 @@ void table_map_free(TABLE_MAP *map)
if (map)
{
MXS_FREE(map->column_types);
MXS_FREE(map->column_metadata);
MXS_FREE(map->null_bitmap);
MXS_FREE(map->database);
MXS_FREE(map->table);
MXS_FREE(map);

View File

@ -63,7 +63,7 @@ static int maxinfo_statistics(INFO_INSTANCE *, INFO_SESSION *, GWBUF *);
static int maxinfo_ping(INFO_INSTANCE *, INFO_SESSION *, GWBUF *);
static int maxinfo_execute_query(INFO_INSTANCE *, INFO_SESSION *, char *);
static int handle_url(INFO_INSTANCE *instance, INFO_SESSION *router_session, GWBUF *queue);
static int maxinfo_send_ok(DCB *dcb);
/* The router entry points */
static MXS_ROUTER *createInstance(SERVICE *service, char **options);
@ -348,7 +348,7 @@ execute(MXS_ROUTER *rinstance, MXS_ROUTER_SESSION *router_session, GWBUF *queue)
switch (MYSQL_COMMAND(queue))
{
case MXS_COM_PING:
rc = maxinfo_ping(instance, session, queue);
rc = maxinfo_send_ok(session->dcb);
break;
case MXS_COM_STATISTICS:
rc = maxinfo_statistics(instance, session, queue);
@ -622,7 +622,7 @@ maxinfo_execute_query(INFO_INSTANCE *instance, INFO_SESSION *session, char *sql)
respond_starttime(session->dcb);
return 1;
}
if (strcasecmp(sql, "set names 'utf8'") == 0)
if (strncasecmp(sql, "set names", 9) == 0)
{
return maxinfo_send_ok(session->dcb);
}
@ -630,6 +630,10 @@ maxinfo_execute_query(INFO_INSTANCE *instance, INFO_SESSION *session, char *sql)
{
return maxinfo_send_ok(session->dcb);
}
if (strncasecmp(sql, "set @@session", 13) == 0)
{
return maxinfo_send_ok(session->dcb);
}
if (strncasecmp(sql, "set autocommit", 14) == 0)
{
return maxinfo_send_ok(session->dcb);

View File

@ -354,6 +354,13 @@ exec_flush(DCB *dcb, MAXINFO_TREE *tree)
int i;
char errmsg[120];
sprintf(errmsg, "Unsupported flush command '%s'", tree->value);
if(!tree)
{
maxinfo_send_error(dcb, 0, errmsg);
MXS_ERROR("%s", errmsg);
return;
}
for (i = 0; flush_commands[i].name; i++)
{
if (strcasecmp(flush_commands[i].name, tree->value) == 0)
@ -366,7 +373,6 @@ exec_flush(DCB *dcb, MAXINFO_TREE *tree)
{
tree->value[80] = 0;
}
sprintf(errmsg, "Unsupported flush command '%s'", tree->value);
maxinfo_send_error(dcb, 0, errmsg);
MXS_ERROR("%s", errmsg);
}