diff --git a/avro/maxavro.h b/avro/maxavro.h index 6cce58666..47d459ee8 100644 --- a/avro/maxavro.h +++ b/avro/maxavro.h @@ -40,6 +40,7 @@ enum maxavro_value_type MAXAVRO_TYPE_BYTES, MAXAVRO_TYPE_ENUM, MAXAVRO_TYPE_NULL, + MAXAVRO_TYPE_UNION, MAXAVRO_TYPE_MAX }; diff --git a/avro/maxavro_internal.h b/avro/maxavro_internal.h index a29aa38a3..1432f13f4 100644 --- a/avro/maxavro_internal.h +++ b/avro/maxavro_internal.h @@ -50,7 +50,12 @@ bool maxavro_datablock_finalize(MAXAVRO_DATABLOCK* block); /** Adding values to a datablock. The caller must ensure that the inserted * values conform to the file schema and that the required amount of fields * is added before finalizing the block. */ -bool maxavro_datablock_add_integer(MAXAVRO_DATABLOCK* file, uint64_t val); -bool maxavro_datablock_add_string(MAXAVRO_DATABLOCK* file, const char* str); -bool maxavro_datablock_add_float(MAXAVRO_DATABLOCK* file, float val); -bool maxavro_datablock_add_double(MAXAVRO_DATABLOCK* file, double val); +bool maxavro_datablock_add_integer(MAXAVRO_DATABLOCK *file, uint64_t val); +bool maxavro_datablock_add_string(MAXAVRO_DATABLOCK *file, const char* str); +bool maxavro_datablock_add_float(MAXAVRO_DATABLOCK *file, float val); +bool maxavro_datablock_add_double(MAXAVRO_DATABLOCK *file, double val); + +bool maxavro_read_datablock_start(MAXAVRO_FILE *file); +bool maxavro_verify_block(MAXAVRO_FILE *file); +const char* type_to_string(enum maxavro_value_type type); +enum maxavro_value_type string_to_type(const char *str); diff --git a/avro/maxavro_record.c b/avro/maxavro_record.c index 69609ef2b..4e18dc3ea 100644 --- a/avro/maxavro_record.c +++ b/avro/maxavro_record.c @@ -18,10 +18,6 @@ #include #include -bool maxavro_read_datablock_start(MAXAVRO_FILE* file); -bool maxavro_verify_block(MAXAVRO_FILE* file); -const char* type_to_string(enum maxavro_value_type type); - /** * @brief Read a single value from a file * @param file File to read from @@ -30,10 +26,10 @@ const char* type_to_string(enum maxavro_value_type type); * @param field_num Field index in the schema * @return JSON object or NULL if an error occurred */ -static json_t* read_and_pack_value(MAXAVRO_FILE* file, MAXAVRO_SCHEMA_FIELD* field) +static json_t* read_and_pack_value(MAXAVRO_FILE *file, MAXAVRO_SCHEMA_FIELD *field, enum maxavro_value_type type) { json_t* value = NULL; - switch (field->type) + switch (type) { case MAXAVRO_TYPE_BOOL: if (file->buffer_ptr < file->buffer_end) @@ -109,6 +105,23 @@ static json_t* read_and_pack_value(MAXAVRO_FILE* file, MAXAVRO_SCHEMA_FIELD* fie } break; + case MAXAVRO_TYPE_UNION: + { + json_t *arr = field->extra; + uint64_t val = 0; + + if (maxavro_read_integer(file, &val) && val < json_array_size(arr)) + { + json_t* union_type = json_object_get(json_array_get(arr, val), "type"); + value = read_and_pack_value(file, field, string_to_type(json_string_value(union_type))); + } + } + break; + + case MAXAVRO_TYPE_NULL: + value = json_null(); + break; + default: MXS_ERROR("Unimplemented type: %d", field->type); break; @@ -174,7 +187,7 @@ json_t* maxavro_record_read_json(MAXAVRO_FILE* file) { for (size_t i = 0; i < file->schema->num_fields; i++) { - json_t* value = read_and_pack_value(file, &file->schema->fields[i]); + json_t* value = read_and_pack_value(file, &file->schema->fields[i], file->schema->fields[i].type); if (value) { json_object_set_new(object, file->schema->fields[i].name, value); diff --git a/avro/maxavro_schema.c b/avro/maxavro_schema.c index f564a9c70..cfcf881c9 100644 --- a/avro/maxavro_schema.c +++ b/avro/maxavro_schema.c @@ -78,6 +78,13 @@ static enum maxavro_value_type unpack_to_type(json_t* object, enum maxavro_value_type rval = MAXAVRO_TYPE_UNKNOWN; json_t* type = NULL; + if (json_is_array(object) && json_is_object(json_array_get(object, 0))) + { + json_incref(object); + field->extra = object; + return MAXAVRO_TYPE_UNION; + } + if (json_is_object(object)) { json_t* tmp = NULL; @@ -191,7 +198,7 @@ static void maxavro_schema_field_free(MAXAVRO_SCHEMA_FIELD* field) if (field) { MXS_FREE(field->name); - if (field->type == MAXAVRO_TYPE_ENUM) + if (field->type == MAXAVRO_TYPE_ENUM || field->type == MAXAVRO_TYPE_UNION) { json_decref((json_t*)field->extra); } diff --git a/connectors/cdc-connector/CMakeLists.txt b/connectors/cdc-connector/CMakeLists.txt index 175eb9f13..346bf25fe 100644 --- a/connectors/cdc-connector/CMakeLists.txt +++ b/connectors/cdc-connector/CMakeLists.txt @@ -2,7 +2,7 @@ add_library(cdc_connector SHARED cdc_connector.cpp) add_dependencies(cdc_connector jansson) target_link_libraries(cdc_connector ${JANSSON_LIBRARIES} crypto) -set_target_properties(cdc_connector PROPERTIES VERSION "1.0.0") +set_target_properties(cdc_connector PROPERTIES VERSION "1.0.1") add_dependencies(cdc_connector jansson) # Static version of the library diff --git a/connectors/cdc-connector/cdc_connector.cpp b/connectors/cdc-connector/cdc_connector.cpp index da20c8e7b..c27cf4339 100644 --- a/connectors/cdc-connector/cdc_connector.cpp +++ b/connectors/cdc-connector/cdc_connector.cpp @@ -362,6 +362,7 @@ void Connection::process_schema(json_t* json) SRow Connection::process_row(json_t* js) { + std::set nulls; ValueVector values; values.reserve(m_keys->size()); m_error.clear(); @@ -373,6 +374,11 @@ SRow Connection::process_row(json_t* js) if (v) { + if (json_is_null(v)) + { + nulls.insert(values.size()); + } + values.push_back(json_to_string(v)); } else @@ -387,7 +393,7 @@ SRow Connection::process_row(json_t* js) if (m_error.empty()) { - rval = SRow(new Row(m_keys, m_types, values)); + rval = SRow(new Row(m_keys, m_types, values, nulls)); } return rval; diff --git a/connectors/cdc-connector/cdc_connector.h b/connectors/cdc-connector/cdc_connector.h index 3b30a5448..321aba00d 100644 --- a/connectors/cdc-connector/cdc_connector.h +++ b/connectors/cdc-connector/cdc_connector.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -199,6 +200,31 @@ public: return m_values.at(it - m_keys->begin()); } + /** + * Check if a field has a NULL value + * + * @param i The field index + * + * @return True if the field has a NULL value + */ + bool is_null(size_t i) const + { + return m_nulls.count(i); + } + + /** + * Check if a field has a NULL value + * + * @param str The field name + * + * @return True if the field has a NULL value + */ + bool is_null(const std::string& str) const + { + ValueVector::const_iterator it = std::find(m_keys->begin(), m_keys->end(), str); + return m_nulls.count(it - m_keys->begin()); + } + /** * Get the GTID of this row * @@ -242,16 +268,19 @@ public: private: SValueVector m_keys; SValueVector m_types; - ValueVector m_values; + ValueVector m_values; + std::set m_nulls; // Only a Connection should construct an InternalRow friend class Connection; Row(SValueVector& keys, SValueVector& types, - ValueVector& values) - : m_keys(keys) - , m_types(types) + ValueVector& values, + std::set& nulls): + m_keys(keys), + m_types(types), + m_nulls(nulls) { m_values.swap(values); } diff --git a/maxscale-system-test/CMakeLists.txt b/maxscale-system-test/CMakeLists.txt index 8d605ecc2..73dca58c3 100644 --- a/maxscale-system-test/CMakeLists.txt +++ b/maxscale-system-test/CMakeLists.txt @@ -1042,4 +1042,7 @@ add_test_executable(mxs2111_auth_string.cpp mxs2111_auth_string replication LABE # MXS-2115: Automatic version_string detection add_test_executable(mxs2115_version_string.cpp mxs2115_version_string replication LABELS REPL_BACKEND) +# MXS-2106: NULL values with avrorouter +add_test_executable(mxs2106_avro_null.cpp mxs2106_avro_null avro LABELS avrorouter REPL_BACKEND) + configure_file(templates.h.in ${CMAKE_CURRENT_BINARY_DIR}/templates.h @ONLY) diff --git a/maxscale-system-test/change_user.cpp b/maxscale-system-test/change_user.cpp index eccf90200..81dc39fa1 100644 --- a/maxscale-system-test/change_user.cpp +++ b/maxscale-system-test/change_user.cpp @@ -12,101 +12,58 @@ * - try INSERT again expecting success (user should not be changed) */ - -#include #include "testconnections.h" -int main(int argc, char* argv[]) +void run_test(TestConnections& test, MYSQL* conn) { - TestConnections* Test = new TestConnections(argc, argv); - Test->set_timeout(60); + test.expect(mysql_change_user(conn, "user", "pass2", "test") == 0, + "changing user failed: %s", mysql_error(conn)); - Test->repl->connect(); - Test->maxscales->connect_maxscale(0); + test.expect(execute_query_silent(conn, "INSERT INTO t1 VALUES (77, 11);") != 0, + "INSERT query succeeded without INSERT privilege"); - Test->tprintf("Creating user 'user' \n"); + test.expect(mysql_change_user(conn, test.repl->user_name, test.repl->password, "test") == 0, + "changing user failed: %s", mysql_error(conn)); - execute_query(Test->maxscales->conn_rwsplit[0], "DROP USER 'user'@'%%'"); - Test->try_query(Test->maxscales->conn_rwsplit[0], (char*) "CREATE USER user@'%%' identified by 'pass2'"); - Test->try_query(Test->maxscales->conn_rwsplit[0], (char*) "GRANT SELECT ON test.* TO user@'%%'"); - Test->try_query(Test->maxscales->conn_rwsplit[0], (char*) "FLUSH PRIVILEGES;"); - Test->try_query(Test->maxscales->conn_rwsplit[0], (char*) "DROP TABLE IF EXISTS t1"); - Test->try_query(Test->maxscales->conn_rwsplit[0], (char*) "CREATE TABLE t1 (x1 int, fl int)"); + test.expect(execute_query_silent(conn, "INSERT INTO t1 VALUES (77, 11);") == 0, + "INSERT query succeeded without INSERT privilege"); - Test->maxscales->restart_maxscale(); - sleep(2); - Test->maxscales->connect_maxscale(0); - Test->tprintf("Changing user... \n"); - Test->add_result(mysql_change_user(Test->maxscales->conn_rwsplit[0], - (char*) "user", - (char*) "pass2", - (char*) "test"), - "changing user failed \n"); - Test->tprintf("mysql_error is %s\n", mysql_error(Test->maxscales->conn_rwsplit[0])); + test.expect(mysql_change_user(conn, "user", "wrong_pass2", "test") != 0, + "changing user with wrong password successed!"); - Test->tprintf("Trying INSERT (expecting access denied)... \n"); - if (execute_query(Test->maxscales->conn_rwsplit[0], (char*) "INSERT INTO t1 VALUES (77, 11);") == 0) - { - Test->add_result(1, "INSERT query succedded to user which does not have INSERT PRIVILEGES\n"); - } + test.expect(strstr(mysql_error(conn), "Access denied for user"), + "Wrong error message returned on failed authentication"); - Test->tprintf("Changing user back... \n"); - Test->add_result(mysql_change_user(Test->maxscales->conn_rwsplit[0], - Test->repl->user_name, - Test->repl->password, - (char*) "test"), - "changing user failed \n"); - - Test->tprintf("Trying INSERT (expecting success)... \n"); - Test->try_query(Test->maxscales->conn_rwsplit[0], (char*) "INSERT INTO t1 VALUES (77, 12);"); - - Test->tprintf("Changing user with wrong password... \n"); - if (mysql_change_user(Test->maxscales->conn_rwsplit[0], - (char*) "user", - (char*) "wrong_pass2", - (char*) "test") == 0) - { - Test->add_result(1, "changing user with wrong password successed! \n"); - } - Test->tprintf("%s\n", mysql_error(Test->maxscales->conn_rwsplit[0])); - if ((strstr(mysql_error(Test->maxscales->conn_rwsplit[0]), "Access denied for user")) == NULL) - { - Test->add_result(1, "There is no proper error message\n"); - } - - Test->tprintf("Trying INSERT again (expecting failure - change user should have failed)..."); - Test->add_result(!execute_query(Test->maxscales->conn_rwsplit[0], - (char*) "INSERT INTO t1 VALUES (77, 13);"), - "Query should fail, MaxScale should disconnect on auth failure"); - - Test->tprintf("Changing user with wrong password using ReadConn \n"); - if (mysql_change_user(Test->maxscales->conn_slave[0], - (char*) "user", - (char*) "wrong_pass2", - (char*) "test") == 0) - { - Test->add_result(1, "FAILED: changing user with wrong password successed! \n"); - } - Test->tprintf("%s\n", mysql_error(Test->maxscales->conn_slave[0])); - if ((strstr(mysql_error(Test->maxscales->conn_slave[0]), "Access denied for user")) == NULL) - { - Test->add_result(1, "There is no proper error message\n"); - } - - Test->tprintf("Changing user for ReadConn \n"); - Test->add_result(mysql_change_user(Test->maxscales->conn_slave[0], - (char*) "user", - (char*) "pass2", - (char*) "test"), - "changing user failed \n"); - - Test->maxscales->connect_maxscale(0); - Test->try_query(Test->maxscales->conn_rwsplit[0], (char*) "DROP USER user@'%%';"); - execute_query_silent(Test->maxscales->conn_rwsplit[0], "DROP TABLE test.t1"); - - Test->maxscales->close_maxscale_connections(0); - int rval = Test->global_result; - delete Test; - return rval; + test.expect(execute_query_silent(conn, "INSERT INTO t1 VALUES (77, 11);") != 0, + "Query should fail, MaxScale should disconnect on auth failure"); +} + +int main(int argc, char *argv[]) +{ + TestConnections test(argc, argv); + + test.repl->connect(); + execute_query(test.repl->nodes[0], "DROP USER 'user'@'%%'"); + test.try_query(test.repl->nodes[0], "CREATE USER user@'%%' identified by 'pass2'"); + test.try_query(test.repl->nodes[0], "GRANT SELECT ON test.* TO user@'%%'"); + test.try_query(test.repl->nodes[0], "FLUSH PRIVILEGES;"); + test.try_query(test.repl->nodes[0], "DROP TABLE IF EXISTS t1"); + test.try_query(test.repl->nodes[0], "CREATE TABLE t1 (x1 int, fl int)"); + test.repl->sync_slaves(); + test.repl->disconnect(); + + test.maxscales->connect(); + test.tprintf("Testing readwritesplit"); + run_test(test, test.maxscales->conn_rwsplit[0]); + test.tprintf("Testing readconnroute"); + run_test(test, test.maxscales->conn_master[0]); + test.maxscales->disconnect(); + + test.repl->connect(); + execute_query_silent(test.repl->nodes[0], "DROP USER user@'%%';"); + execute_query_silent(test.repl->nodes[0], "DROP TABLE test.t1"); + test.repl->disconnect(); + + return test.global_result; } diff --git a/maxscale-system-test/mxs2106_avro_null.cpp b/maxscale-system-test/mxs2106_avro_null.cpp new file mode 100644 index 000000000..4c8ad169d --- /dev/null +++ b/maxscale-system-test/mxs2106_avro_null.cpp @@ -0,0 +1,76 @@ +/** + * MXS-2106: Maxscale CDC JSON output does not respect null values + */ + +#include "testconnections.h" +#include + +int main(int argc, char** argv) +{ + TestConnections::skip_maxscale_start(true); + TestConnections::check_nodes(false); + TestConnections test(argc, argv); + + test.set_timeout(120); + test.replicate_from_master(0); + + test.repl->connect(); + execute_query(test.repl->nodes[0], + "CREATE OR REPLACE TABLE `test`.`test1` (" + " `test1_id` int(10) unsigned NOT NULL AUTO_INCREMENT," + " `some_id` int(10) unsigned DEFAULT NULL," + " `desc` varchar(50) DEFAULT NULL," + " `some_date` timestamp NULL DEFAULT NULL," + " `updated` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," + " PRIMARY KEY (`test1_id`)" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;" + "INSERT INTO test.test1(some_id,`desc`,some_date) VALUES (1,NULL,NULL), (NULL,'value1',NULL)," + "(NULL,NULL,NOW());" + "UPDATE test.test1 SET some_id = NULL, `desc` = 'value2', some_date = NOW() WHERE test1_id = 1;" + "UPDATE test.test1 SET some_id = 35, `desc` = NULL, some_date = NULL WHERE test1_id = 2;"); + + /** Give avrorouter some time to process the events */ + test.stop_timeout(); + sleep(10); + test.set_timeout(120); + + CDC::Connection conn(test.maxscales->IP[0], 4001, "skysql", "skysql"); + + test.expect(conn.connect("test.test1"), "Failed to connect"); + + auto check = [&](const std::string& name) { + static int i = 1; + CDC::SRow row = conn.read(); + + if (row) + { + test.expect(row->is_null(name), + "%d: `%s` is not null: %s", + i++, + name.c_str(), + row->value(name).c_str()); + } + else + { + test.tprintf("Error: %s", conn.error().c_str()); + } + }; + + // The three inserts + check("some_date"); + check("some_id"); + check("some_id"); + + // First update + check("desc"); + check("some_id"); + + // Second update + check("some_id"); + check("desc"); + + execute_query(test.repl->nodes[0], "DROP TABLE `test1`"); + test.repl->disconnect(); + + return test.global_result; +} diff --git a/server/modules/protocol/examples/cdc_schema.go b/server/modules/protocol/examples/cdc_schema.go index 9b05c7ac4..50d1d8985 100644 --- a/server/modules/protocol/examples/cdc_schema.go +++ b/server/modules/protocol/examples/cdc_schema.go @@ -46,7 +46,7 @@ The "user" and "password" flags are required. // Avro field type Field struct { Name string `json:"name"` - Type string `json:"type"` + Type []string `json:"type"` RealType string `json:"real_type"` Length int `json:"length"` } @@ -81,25 +81,23 @@ func (f *Field) ToAvroType() { switch f.Type { case "date", "datetime", "time", "timestamp", "year", "tinytext", "text", "mediumtext", "longtext", "char", "varchar": - f.Type = "string" + f.Type = ["null", "string"] f.Length, _ = strconv.Atoi(length_re.ReplaceAllString(orig, "$1")) case "enum", "set": - f.Type = "string" + f.Type = ["null", "string"] case "tinyblob", "blob", "mediumblob", "longblob", "binary", "varbinary": - f.Type = "bytes" + f.Type = ["null", "bytes"] case "int", "smallint", "mediumint", "integer", "tinyint", "short", "bit": - f.Type = "int" + f.Type = ["null", "int"] case "float": - f.Type = "float" + f.Type = ["null", "float"] case "double", "decimal": - f.Type = "double" - case "null": - f.Type = "null" + f.Type = ["null", "double"] case "long", "bigint": - f.Type = "long" + f.Type = ["null", "long"] default: LogObject(f) - f.Type = "string" + f.Type = ["null", "string"] } } diff --git a/server/modules/protocol/examples/cdc_schema.py b/server/modules/protocol/examples/cdc_schema.py index 72832dfd4..86009d09c 100755 --- a/server/modules/protocol/examples/cdc_schema.py +++ b/server/modules/protocol/examples/cdc_schema.py @@ -43,24 +43,23 @@ def parse_field(row): else: res["length"] = -1 + type = "string" + if name in ("date", "datetime", "time", "timestamp", "year", "tinytext", "text", "mediumtext", "longtext", "char", "varchar", "enum", "set"): - res["type"] = "string" + type = "string" elif name in ("tinyblob", "blob", "mediumblob", "longblob", "binary", "varbinary"): - res["type"] = "bytes" + type = "bytes" elif name in ("int", "smallint", "mediumint", "integer", "tinyint", "short", "bit"): - res["type"] = "int" + type = "int" elif name in ("float"): - res["type"] = "float" + type = "float" elif name in ("double", "decimal"): - res["type"] = "double" - elif name in ("null"): - res["type"] = "null" + type = "double" elif name in ("long", "bigint"): - res["type"] = "long" - else: - res["type"] = "string" + type = "long" + res["type"] = ["null", type] res["name"] = row[0].lower() diff --git a/server/modules/protocol/examples/requirements.txt b/server/modules/protocol/examples/requirements.txt new file mode 100644 index 000000000..d65438109 --- /dev/null +++ b/server/modules/protocol/examples/requirements.txt @@ -0,0 +1,2 @@ +mysql-connector-python +kafka-python diff --git a/server/modules/routing/avrorouter/avro_converter.cc b/server/modules/routing/avrorouter/avro_converter.cc index 7b90d5de5..a0c9bd4cc 100644 --- a/server/modules/routing/avrorouter/avro_converter.cc +++ b/server/modules/routing/avrorouter/avro_converter.cc @@ -232,10 +232,11 @@ char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEv json_array_append_new(array, json_pack_ex(&err, 0, - "{s:s, s:s, s:s, s:i}", + "{s:s, s:[s, s], s:s, s:i}", "name", create->columns[i].name.c_str(), "type", + "null", column_type_to_avro_type(map->column_types[i]), "real_type", create->columns[i].type.c_str(), @@ -449,23 +450,16 @@ void AvroConverter::column(int i, uint8_t* value, int len) void AvroConverter::column(int i) { set_active(i); - - if (column_is_blob(m_map->column_types[i])) - { - uint8_t nullvalue = 0; - avro_value_set_bytes(&m_field, &nullvalue, 1); - } - else - { - avro_value_set_null(&m_field); - } + avro_value_set_branch(&m_union_value, 0, &m_field); + avro_value_set_null(&m_field); } void AvroConverter::set_active(int i) { MXB_AT_DEBUG(int rc = ) avro_value_get_by_name(&m_record, m_create->columns[i].name.c_str(), - &m_field, + &m_union_value, NULL); mxb_assert(rc == 0); + avro_value_set_branch(&m_union_value, 1, &m_field); } diff --git a/server/modules/routing/avrorouter/avro_converter.hh b/server/modules/routing/avrorouter/avro_converter.hh index bf97e4a56..fdbab4241 100644 --- a/server/modules/routing/avrorouter/avro_converter.hh +++ b/server/modules/routing/avrorouter/avro_converter.hh @@ -65,6 +65,7 @@ private: avro_value_iface_t* m_writer_iface; avro_file_writer_t* m_avro_file; avro_value_t m_record; + avro_value_t m_union_value; avro_value_t m_field; std::string m_avrodir; AvroTables m_open_tables; diff --git a/server/modules/routing/avrorouter/avro_rbr.cc b/server/modules/routing/avrorouter/avro_rbr.cc index 093919931..3be5e8d7a 100644 --- a/server/modules/routing/avrorouter/avro_rbr.cc +++ b/server/modules/routing/avrorouter/avro_rbr.cc @@ -264,7 +264,10 @@ uint8_t* process_row_event_data(STableMapEvent map, { if (bit_is_set(columns_present, ncolumns, i)) { + avro_value_t field; + avro_value_set_branch(&union_value, 1, &field); npresent++; + if (bit_is_set(null_bitmap, ncolumns, i)) { sprintf(trace[i], "[%ld] NULL", i);