Merge branch 'develop' into MXS-1266

This commit is contained in:
MassimilianoPinto
2017-05-25 08:43:10 +02:00
682 changed files with 51954 additions and 1257 deletions

View File

@ -473,6 +473,17 @@ void notify_all_clients(AVRO_INSTANCE *router)
}
}
void do_checkpoint(AVRO_INSTANCE *router, uint64_t *total_rows, uint64_t *total_commits)
{
update_used_tables(router);
avro_flush_all_tables(router, AVROROUTER_FLUSH);
avro_save_conversion_state(router);
notify_all_clients(router);
*total_rows += router->row_count;
*total_commits += router->trx_count;
router->row_count = router->trx_count = 0;
}
/**
* @brief Read all replication events from a binlog file.
*
@ -552,6 +563,8 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router)
}
else
{
do_checkpoint(router, &total_rows, &total_commits);
MXS_INFO("Processed %lu transactions and %lu row events.",
total_commits, total_rows);
if (rotate_seen)
@ -739,13 +752,7 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router)
if (router->row_count >= router->row_target ||
router->trx_count >= router->trx_target)
{
update_used_tables(router);
avro_flush_all_tables(router, AVROROUTER_SYNC);
avro_save_conversion_state(router);
notify_all_clients(router);
total_rows += router->row_count;
total_commits += router->trx_count;
router->row_count = router->trx_count = 0;
do_checkpoint(router, &total_rows, &total_commits);
}
}

View File

@ -180,6 +180,11 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
"table until a DDL statement for it is read.", table_ident);
}
if (rval)
{
MXS_INFO("Table Map for '%s' at %lu", table_ident, router->current_pos);
}
return rval;
}
@ -307,9 +312,13 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
* beforehand so we must continue processing them until we reach the end
* of the event. */
int rows = 0;
MXS_INFO("Row Event for '%s' at %lu", table_ident, router->current_pos);
while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN)
{
static uint64_t total_row_count = 1;
MXS_INFO("Row %lu", total_row_count++);
/** Add the current GTID and timestamp */
uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
int event_type = get_event_type(hdr->event_type);
@ -525,9 +534,8 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
ptr += (ncolumns + 7) / 8;
ss_dassert(ptr < end);
for (long i = 0; i < map->columns && npresent < ncolumns; i++)
for (long i = 0; i < map->columns && i < create->columns && npresent < ncolumns; i++)
{
ss_dassert(create->columns == map->columns);
ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL);
ss_dassert(rc == 0);
@ -536,6 +544,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
npresent++;
if (bit_is_set(null_bitmap, ncolumns, i))
{
MXS_INFO("[%ld] NULL", i);
if (column_is_blob(map->column_types[i]))
{
uint8_t nullvalue = 0;
@ -565,17 +574,45 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
MXS_WARNING("ENUM/SET values larger than 255 values aren't supported.");
}
avro_value_set_string(&field, strval);
MXS_INFO("[%ld] ENUM: %lu bytes", i, bytes);
ptr += bytes;
ss_dassert(ptr < end);
}
else
{
uint8_t bytes = *ptr;
/**
* The first byte in the metadata stores the real type of
* the string (ENUM and SET types are also stored as fixed
* length strings).
*
* The first two bits of the second byte contain the XOR'ed
* field length but as that information is not relevant for
* us, we just use this information to know whether to read
* one or two bytes for string length.
*/
uint16_t meta = metadata[metadata_offset + 1] + (metadata[metadata_offset] << 8);
int bytes = 0;
uint16_t extra_length = (((meta >> 4) & 0x300) ^ 0x300);
uint16_t field_length = (meta & 0xff) + extra_length;
if (field_length > 255)
{
bytes = ptr[0] + (ptr[1] << 8);
ptr += 2;
}
else
{
bytes = *ptr++;
}
MXS_INFO("[%ld] CHAR: field: %d bytes, data: %d bytes", i, field_length, bytes);
ss_dassert(bytes || *ptr == '\0');
char str[bytes + 1];
memcpy(str, ptr + 1, bytes);
memcpy(str, ptr, bytes);
str[bytes] = '\0';
avro_value_set_string(&field, str);
ptr += bytes + 1;
ptr += bytes;
ss_dassert(ptr < end);
}
}
@ -595,6 +632,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
MXS_WARNING("BIT is not currently supported, values are stored as 0.");
}
avro_value_set_int(&field, value);
MXS_INFO("[%ld] BIT", i);
ptr += bytes;
ss_dassert(ptr < end);
}
@ -603,6 +641,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
double f_value = 0.0;
ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value);
avro_value_set_double(&field, f_value);
MXS_INFO("[%ld] DOUBLE", i);
ss_dassert(ptr < end);
}
else if (column_is_variable_string(map->column_types[i]))
@ -620,6 +659,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
ptr++;
}
MXS_INFO("[%ld] VARCHAR: field: %d bytes, data: %lu bytes", i, bytes, sz);
char buf[sz + 1];
memcpy(buf, ptr, sz);
buf[sz] = '\0';
@ -633,6 +673,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
uint64_t len = 0;
memcpy(&len, ptr, bytes);
ptr += bytes;
MXS_INFO("[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len);
if (len)
{
avro_value_set_bytes(&field, ptr, len);
@ -649,9 +690,12 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
{
char buf[80];
struct tm tm;
ptr += unpack_temporal_value(map->column_types[i], ptr, &metadata[metadata_offset], &tm);
ptr += unpack_temporal_value(map->column_types[i], ptr,
&metadata[metadata_offset],
create->column_lengths[i], &tm);
format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm);
avro_value_set_string(&field, buf);
MXS_INFO("[%ld] TEMPORAL: %s", i, buf);
ss_dassert(ptr < end);
}
/** All numeric types (INT, LONG, FLOAT etc.) */

View File

@ -124,9 +124,11 @@ char* json_new_schema_from_table(TABLE_MAP *map)
for (uint64_t i = 0; i < map->columns; i++)
{
json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
create->column_names[i], "type",
column_type_to_avro_type(map->column_types[i])));
json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}",
"name", create->column_names[i],
"type", column_type_to_avro_type(map->column_types[i]),
"real_type", create->column_types[i],
"length", create->column_lengths[i]));
}
json_object_set_new(schema, "fields", array);
char* rval = json_dumps(schema, JSON_PRESERVE_ORDER);
@ -172,8 +174,10 @@ bool json_extract_field_names(const char* filename, TABLE_CREATE *table)
{
int array_size = json_array_size(arr);
table->column_names = (char**)MXS_MALLOC(sizeof(char*) * (array_size));
table->column_types = (char**)MXS_MALLOC(sizeof(char*) * (array_size));
table->column_lengths = (int*)MXS_MALLOC(sizeof(int) * (array_size));
if (table->column_names)
if (table->column_names && table->column_types && table->column_lengths)
{
int columns = 0;
rval = true;
@ -184,6 +188,28 @@ bool json_extract_field_names(const char* filename, TABLE_CREATE *table)
if (json_is_object(val))
{
json_t* value;
if ((value = json_object_get(val, "real_type")) && json_is_string(value))
{
table->column_types[columns] = MXS_STRDUP_A(json_string_value(value));
}
else
{
table->column_types[columns] = MXS_STRDUP_A("unknown");
MXS_WARNING("No \"real_type\" value defined. Treating as unknown type field.");
}
if ((value = json_object_get(val, "length")) && json_is_integer(value))
{
table->column_lengths[columns] = json_integer_value(value);
}
else
{
table->column_lengths[columns] = -1;
MXS_WARNING("No \"length\" value defined. Treating as default length field.");
}
json_t *name = json_object_get(val, "name");
if (name && json_is_string(name))
{
@ -489,7 +515,6 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size)
dest[bytes] = '\0';
make_valid_avro_identifier(dest);
ptr = next_field_definition(ptr);
}
else
{
@ -499,56 +524,98 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size)
return ptr;
}
int extract_type_length(const char* ptr, char *dest)
{
/** Skip any leading whitespace */
while (isspace(*ptr) || *ptr == '`')
{
ptr++;
}
/** The field type definition starts here */
const char *start = ptr;
/** Skip characters until we either hit a whitespace character or the start
* of the length definition. */
while (!isspace(*ptr) && *ptr != '(')
{
ptr++;
}
/** Store type */
int typelen = ptr - start;
memcpy(dest, start, typelen);
dest[typelen] = '\0';
/** Skip whitespace */
while (isspace(*ptr))
{
ptr++;
}
int rval = -1; // No length defined
/** Start of length definition */
if (*ptr == '(')
{
ptr++;
char *end;
int val = strtol(ptr, &end, 10);
if (*end == ')')
{
rval = val;
}
}
return rval;
}
int count_columns(const char* ptr)
{
int i = 2;
while ((ptr = strchr(ptr, ',')))
{
ptr++;
i++;
}
return i;
}
/**
* Process a table definition into an array of column names
* @param nameptr table definition
* @return Number of processed columns or -1 on error
*/
static int process_column_definition(const char *nameptr, char*** dest)
static int process_column_definition(const char *nameptr, char*** dest, char*** dest_types, int** dest_lens)
{
/** Process columns in groups of 8 */
size_t chunks = 1;
const size_t chunk_size = 8;
int i = 0;
char **names = MXS_MALLOC(sizeof(char*) * (chunks * chunk_size + 1));
if (names == NULL)
{
return -1;
}
int n = count_columns(nameptr);
*dest = MXS_MALLOC(sizeof(char*) * n);
*dest_types = MXS_MALLOC(sizeof(char*) * n);
*dest_lens = MXS_MALLOC(sizeof(int) * n);
char **names = *dest;
char **types = *dest_types;
int *lengths = *dest_lens;
char colname[512];
int i = 0;
while ((nameptr = extract_field_name(nameptr, colname, sizeof(colname))))
{
if (i >= chunks * chunk_size)
{
char **tmp = MXS_REALLOC(names, (++chunks * chunk_size + 1) * sizeof(char*));
if (tmp == NULL)
{
for (int x = 0; x < i; x++)
{
MXS_FREE(names[x]);
}
MXS_FREE(names);
return -1;
}
names = tmp;
}
ss_dassert(i < n);
char type[100] = "";
int len = extract_type_length(nameptr, type);
nameptr = next_field_definition(nameptr);
fix_reserved_word(colname);
if ((names[i++] = MXS_STRDUP(colname)) == NULL)
{
for (int x = 0; x < i; x++)
{
MXS_FREE(names[x]);
}
MXS_FREE(names);
return -1;
}
lengths[i] = len;
types[i] = MXS_STRDUP_A(type);
names[i] = MXS_STRDUP_A(colname);
i++;
}
*dest = names;
return i;
}
@ -601,7 +668,7 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
char database[MYSQL_DATABASE_MAXLEN + 1];
const char *db = event_db;
MXS_DEBUG("Create table statement: %.*s", stmt_len, statement_sql);
MXS_INFO("Create table: %s", sql);
if (!get_table_name(sql, table))
{
@ -621,8 +688,10 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
db = database;
}
int* lengths = NULL;
char **names = NULL;
int n_columns = process_column_definition(statement_sql, &names);
char **types = NULL;
int n_columns = process_column_definition(statement_sql, &names, &types, &lengths);
ss_dassert(n_columns > 0);
/** We have appear to have a valid CREATE TABLE statement */
@ -634,6 +703,8 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
rval->version = 1;
rval->was_used = false;
rval->column_names = names;
rval->column_lengths = lengths;
rval->column_types = types;
rval->columns = n_columns;
rval->database = MXS_STRDUP(db);
rval->table = MXS_STRDUP(table);
@ -675,8 +746,11 @@ void table_create_free(TABLE_CREATE* value)
for (uint64_t i = 0; i < value->columns; i++)
{
MXS_FREE(value->column_names[i]);
MXS_FREE(value->column_types[i]);
}
MXS_FREE(value->column_names);
MXS_FREE(value->column_types);
MXS_FREE(value->column_lengths);
MXS_FREE(value->table);
MXS_FREE(value->database);
MXS_FREE(value);
@ -792,6 +866,26 @@ void make_avro_token(char* dest, const char* src, int length)
memcpy(dest, src, length);
dest[length] = '\0';
fix_reserved_word(dest);
}
int get_column_index(TABLE_CREATE *create, const char *tok)
{
int idx = -1;
char safe_tok[strlen(tok) + 2];
strcpy(safe_tok, tok);
fix_reserved_word(safe_tok);
for (int x = 0; x < create->columns; x++)
{
if (strcasecmp(create->column_names[x], tok) == 0)
{
idx = x;
break;
}
}
return idx;
}
bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
@ -805,7 +899,7 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
if (tok)
{
MXS_DEBUG("Altering table %.*s\n", len, tok);
MXS_INFO("Alter table '%.*s'; %.*s\n", len, tok, (int)(end - sql), sql);
def = tok + len;
}
@ -844,27 +938,45 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
{
tok = get_tok(tok + len, &len, end);
MXS_FREE(create->column_names[create->columns - 1]);
char ** tmp = MXS_REALLOC(create->column_names, sizeof(char*) * create->columns - 1);
ss_dassert(tmp);
int idx = get_column_index(create, tok);
if (tmp == NULL)
if (idx != -1)
{
return false;
MXS_FREE(create->column_names[idx]);
for (int i = idx; i < (int)create->columns - 1; i++)
{
create->column_names[i] = create->column_names[i + 1];
}
char ** tmp = realloc(create->column_names, sizeof(char*) * create->columns - 1);
ss_dassert(tmp);
if (tmp == NULL)
{
return false;
}
create->column_names = tmp;
create->columns--;
updates++;
}
create->column_names = tmp;
create->columns--;
updates++;
tok = get_next_def(tok, end);
len = 0;
}
else if (tok_eq(ptok, "change", plen) && tok_eq(tok, "column", len))
{
tok = get_tok(tok + len, &len, end);
MXS_FREE(create->column_names[create->columns - 1]);
create->column_names[create->columns - 1] = strndup(tok, len);
updates++;
int idx = get_column_index(create, tok);
if (idx != -1)
{
MXS_FREE(create->column_names[idx]);
create->column_names[idx] = strndup(tok, len);
updates++;
}
tok = get_next_def(tok, end);
len = 0;
}
@ -975,7 +1087,6 @@ TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create)
map->id = table_id;
map->version = create->version;
map->flags = flags;
ss_dassert(column_count == create->columns);
map->columns = column_count;
map->column_types = MXS_MALLOC(column_count);
/** Allocate at least one byte for the metadata */

View File

@ -84,6 +84,23 @@ static const char *avro_event_type = "event_type";
static const char *avro_timestamp = "timestamp";
static char *avro_client_ouput[] = { "Undefined", "JSON", "Avro" };
static inline bool is_reserved_word(const char* word)
{
return strcasecmp(word, avro_domain) == 0 ||
strcasecmp(word, avro_server_id) == 0 ||
strcasecmp(word, avro_sequence) == 0 ||
strcasecmp(word, avro_event_number) == 0 ||
strcasecmp(word, avro_event_type) == 0 ||
strcasecmp(word, avro_timestamp) == 0;
}
static inline void fix_reserved_word(char *tok)
{
if (is_reserved_word(tok))
{
strcat(tok, "_");
}
}
/** How a binlog file is closed */
typedef enum avro_binlog_end
@ -111,6 +128,8 @@ typedef struct table_create
{
uint64_t columns;
char **column_names;
char **column_types;
int* column_lengths;
char *table;
char *database;
int version; /**< How many versions of this table have been used */

View File

@ -1541,6 +1541,34 @@ static void alterService(DCB *dcb, SERVICE *service, char *v1, char *v2, char *v
}
}
static void alterMaxScale(DCB *dcb, char *v1, char *v2, char *v3,
char *v4, char *v5, char *v6, char *v7, char *v8, char *v9,
char *v10, char *v11)
{
char *values[11] = {v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11};
const int items = sizeof(values) / sizeof(values[0]);
for (int i = 0; i < items && values[i]; i++)
{
char *key = values[i];
char *value = strchr(key, '=');
if (value)
{
*value++ = '\0';
if (!runtime_alter_maxscale(key, value))
{
dcb_printf(dcb, "Error: Bad key-value parameter: %s=%s\n", key, value);
}
}
else
{
dcb_printf(dcb, "Error: not a key-value parameter: %s\n", values[i]);
}
}
}
struct subcommand alteroptions[] =
{
{
@ -1632,6 +1660,27 @@ struct subcommand alteroptions[] =
ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING,
ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING
}
},
{
"maxscale", 1, 11, alterMaxScale,
"Alter maxscale parameters",
"Usage: alter maxscale KEY=VALUE ...\n"
"\n"
"Parameters:\n"
"KEY=VALUE List of `key=value` pairs separated by spaces\n"
"\n"
"The following configuration values can be altered:\n"
"auth_connect_timeout Connection timeout for permission checks\n"
"auth_read_timeout Read timeout for permission checks\n"
"auth_write_timeout Write timeout for permission checks\n"
"admin_auth Enable admin interface authentication\n"
"\n"
"Example: alter maxscale auth_connect_timeout=10",
{
ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING,
ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING,
ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING
}
},
{
EMPTY_OPTION

View File

@ -29,6 +29,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
#include <maxscale/alloc.h>
#include <maxscale/atomic.h>
@ -998,7 +999,7 @@ maxinfo_zombie_dcbs()
/**
* Interface to poll stats for reads
*/
static int
static int64_t
maxinfo_read_events()
{
return poll_get_stat(POLL_STAT_READ);
@ -1007,7 +1008,7 @@ maxinfo_read_events()
/**
* Interface to poll stats for writes
*/
static int
static int64_t
maxinfo_write_events()
{
return poll_get_stat(POLL_STAT_WRITE);
@ -1016,7 +1017,7 @@ maxinfo_write_events()
/**
* Interface to poll stats for errors
*/
static int
static int64_t
maxinfo_error_events()
{
return poll_get_stat(POLL_STAT_ERROR);
@ -1025,7 +1026,7 @@ maxinfo_error_events()
/**
* Interface to poll stats for hangup
*/
static int
static int64_t
maxinfo_hangup_events()
{
return poll_get_stat(POLL_STAT_HANGUP);
@ -1034,7 +1035,7 @@ maxinfo_hangup_events()
/**
* Interface to poll stats for accepts
*/
static int
static int64_t
maxinfo_accept_events()
{
return poll_get_stat(POLL_STAT_ACCEPT);
@ -1043,7 +1044,7 @@ maxinfo_accept_events()
/**
* Interface to poll stats for event queue length
*/
static int
static int64_t
maxinfo_event_queue_length()
{
return poll_get_stat(POLL_STAT_EVQ_LEN);
@ -1052,7 +1053,7 @@ maxinfo_event_queue_length()
/**
* Interface to poll stats for max event queue length
*/
static int
static int64_t
maxinfo_max_event_queue_length()
{
return poll_get_stat(POLL_STAT_EVQ_MAX);
@ -1061,7 +1062,7 @@ maxinfo_max_event_queue_length()
/**
* Interface to poll stats for max queue time
*/
static int
static int64_t
maxinfo_max_event_queue_time()
{
return poll_get_stat(POLL_STAT_MAX_QTIME);
@ -1070,7 +1071,7 @@ maxinfo_max_event_queue_time()
/**
* Interface to poll stats for max event execution time
*/
static int
static int64_t
maxinfo_max_event_exec_time()
{
return poll_get_stat(POLL_STAT_MAX_EXECTIME);
@ -1142,8 +1143,8 @@ status_row(RESULTSET *result, void *data)
(char *)(*status[context->index].func)());
break;
case VT_INT:
snprintf(buf, 80, "%ld",
(long)(*status[context->index].func)());
snprintf(buf, 80, "%" PRId64,
(int64_t)(*status[context->index].func)());
resultset_row_set(row, 1, buf);
break;
default:

View File

@ -13,7 +13,6 @@
#include "readwritesplit.h"
#include <my_config.h>
#include <stdio.h>
#include <strings.h>
#include <string.h>
@ -162,12 +161,12 @@ log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t
{
if (rses->load_data_state == LOAD_DATA_INACTIVE)
{
unsigned char command = MYSQL_GET_COMMAND(GWBUF_DATA(querybuf));
char *qtypestr = qc_typemask_to_string(qtype);
char *sql;
uint8_t *packet = GWBUF_DATA(querybuf);
unsigned char command = packet[4];
int len = 0;
char* sql;
modutil_extract_SQL(querybuf, &sql, &len);
char *qtypestr = qc_typemask_to_string(qtype);
if (len > RWSPLIT_TRACE_MSG_LEN)
{
@ -181,9 +180,9 @@ log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t
const char *hint = querybuf->hint == NULL ? "" : ", Hint:";
const char *hint_type = querybuf->hint == NULL ? "" : STRHINTTYPE(querybuf->hint->type);
MXS_INFO("> Autocommit: %s, trx is %s, cmd: %s, type: %s, stmt: %.*s%s %s",
autocommit, transaction, STRPACKETTYPE(command), querytype, len,
sql, hint, hint_type);
MXS_INFO("> Autocommit: %s, trx is %s, cmd: (0x%02x) %s, type: %s, stmt: %.*s%s %s",
autocommit, transaction, command, STRPACKETTYPE(command),
querytype, len, sql, hint, hint_type);
MXS_FREE(qtypestr);
}