diff --git a/maxscale-system-test/cdc_connector.cpp b/maxscale-system-test/cdc_connector.cpp index 0800d698e..b811c2297 100644 --- a/maxscale-system-test/cdc_connector.cpp +++ b/maxscale-system-test/cdc_connector.cpp @@ -204,6 +204,14 @@ bool Connection::readRow(std::string& dest) else { dest += buf; + + if (dest[0] == 'E' && dest[1] == 'R' & dest[2] == 'R') + { + m_error = "Server responded with an error: "; + m_error += dest; + rval = false; + break; + } } } diff --git a/maxscale-system-test/cdc_datatypes/cdc_datatypes.cpp b/maxscale-system-test/cdc_datatypes/cdc_datatypes.cpp index 97b5191a2..f9a29f379 100644 --- a/maxscale-system-test/cdc_datatypes/cdc_datatypes.cpp +++ b/maxscale-system-test/cdc_datatypes/cdc_datatypes.cpp @@ -143,6 +143,7 @@ bool run_test(TestConnections& test) { bool rval = true; + test.tprintf("Inserting data"); for (int x = 0; test_set[x].types; x++) { for (int i = 0; test_set[x].types[i]; i++) @@ -152,6 +153,7 @@ bool run_test(TestConnections& test) } } + test.tprintf("Waiting for avrorouter to process data"); test.repl->connect(); execute_query(test.repl->nodes[0], "FLUSH LOGS"); test.repl->close_connections(); @@ -196,6 +198,7 @@ bool run_test(TestConnections& test) std::string err = conn.getError(); test.tprintf("Failed to request data: %s", err.c_str()); rval = false; + break; } test.stop_timeout(); } @@ -209,8 +212,7 @@ int main(int argc, char *argv[]) TestConnections::check_nodes(false); TestConnections test(argc, argv); - test.start_binlog(); - test.restart_maxscale(); + test.replicate_from_master(); if (!run_test(test)) { diff --git a/server/modules/routing/avro/avro.c b/server/modules/routing/avro/avro.c index 3306b4052..6e5c74674 100644 --- a/server/modules/routing/avro/avro.c +++ b/server/modules/routing/avro/avro.c @@ -333,6 +333,11 @@ createInstance(SERVICE *service, char **options) inst->row_target = AVRO_DEFAULT_BLOCK_ROW_COUNT; inst->trx_target = AVRO_DEFAULT_BLOCK_TRX_COUNT; inst->block_size = 0; + inst->gtid.domain = 0; + inst->gtid.event_num = 0; + inst->gtid.seq = 0; + inst->gtid.server_id = 0; + inst->gtid.timestamp = 0; int first_file = 1; bool err = false; diff --git a/server/modules/routing/avro/avro_file.c b/server/modules/routing/avro/avro_file.c index fc8834cff..d8f6d1462 100644 --- a/server/modules/routing/avro/avro_file.c +++ b/server/modules/routing/avro/avro_file.c @@ -114,6 +114,7 @@ AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, size &table->avro_schema)) { MXS_ERROR("Avro error: %s", avro_strerror()); + MXS_INFO("Avro schema: %s", json_schema); free(table); return NULL; } diff --git a/server/modules/routing/avro/avro_index.c b/server/modules/routing/avro/avro_index.c index d5ede4944..09d5bbb92 100644 --- a/server/modules/routing/avro/avro_index.c +++ b/server/modules/routing/avro/avro_index.c @@ -204,7 +204,7 @@ void avro_update_index(AVRO_INSTANCE* router) /** The SQL for the in-memory used_tables table */ static const char *insert_sql = "INSERT OR IGNORE INTO "MEMORY_TABLE_NAME "(domain, server_id, sequence, binlog_timestamp, table_name)" - " VALUES (%lu, %lu, %lu, %lu, \"%s\")"; + " VALUES (%lu, %lu, %lu, %u, \"%s\")"; /** * @brief Add a used table to the current transaction diff --git a/server/modules/routing/avro/avro_schema.c b/server/modules/routing/avro/avro_schema.c index 836144718..156bf94a9 100644 --- a/server/modules/routing/avro/avro_schema.c +++ b/server/modules/routing/avro/avro_schema.c @@ -126,6 +126,8 @@ char* json_new_schema_from_table(TABLE_MAP *map) for (uint64_t i = 0; i < map->columns; i++) { + ss_info_dassert(create->column_names[i] && *create->column_names[i], + "Column name should not be empty or NULL"); json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}", "name", create->column_names[i], "type", column_type_to_avro_type(map->column_types[i]), @@ -523,6 +525,7 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size) dest[bytes] = '\0'; make_valid_avro_identifier(dest); + ss_dassert(strlen(dest) > 0); } else { @@ -535,7 +538,7 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size) int extract_type_length(const char* ptr, char *dest) { /** Skip any leading whitespace */ - while (isspace(*ptr) || *ptr == '`') + while (*ptr && (isspace(*ptr) || *ptr == '`')) { ptr++; } @@ -545,7 +548,7 @@ int extract_type_length(const char* ptr, char *dest) /** Skip characters until we either hit a whitespace character or the start * of the length definition. */ - while (!isspace(*ptr) && *ptr != '(') + while (*ptr && !isspace(*ptr) && *ptr != '(') { ptr++; } @@ -556,7 +559,7 @@ int extract_type_length(const char* ptr, char *dest) dest[typelen] = '\0'; /** Skip whitespace */ - while (isspace(*ptr)) + while (*ptr && isspace(*ptr)) { ptr++; } @@ -621,6 +624,7 @@ static int process_column_definition(const char *nameptr, char*** dest, char*** lengths[i] = len; types[i] = strdup(type); names[i] = strdup(colname); + ss_info_dassert(*names[i] && *types[i], "`name` and `type` must not be empty"); i++; }