Merge branch '2.2' into 2.3

This commit is contained in:
Markus Mäkelä 2018-11-06 21:12:20 +02:00
commit 383b0b1989
No known key found for this signature in database
GPG Key ID: 72D48FCE664F7B19
16 changed files with 232 additions and 138 deletions

View File

@ -40,6 +40,7 @@ enum maxavro_value_type
MAXAVRO_TYPE_BYTES,
MAXAVRO_TYPE_ENUM,
MAXAVRO_TYPE_NULL,
MAXAVRO_TYPE_UNION,
MAXAVRO_TYPE_MAX
};

View File

@ -50,7 +50,12 @@ bool maxavro_datablock_finalize(MAXAVRO_DATABLOCK* block);
/** Adding values to a datablock. The caller must ensure that the inserted
* values conform to the file schema and that the required amount of fields
* is added before finalizing the block. */
bool maxavro_datablock_add_integer(MAXAVRO_DATABLOCK* file, uint64_t val);
bool maxavro_datablock_add_string(MAXAVRO_DATABLOCK* file, const char* str);
bool maxavro_datablock_add_float(MAXAVRO_DATABLOCK* file, float val);
bool maxavro_datablock_add_double(MAXAVRO_DATABLOCK* file, double val);
bool maxavro_datablock_add_integer(MAXAVRO_DATABLOCK *file, uint64_t val);
bool maxavro_datablock_add_string(MAXAVRO_DATABLOCK *file, const char* str);
bool maxavro_datablock_add_float(MAXAVRO_DATABLOCK *file, float val);
bool maxavro_datablock_add_double(MAXAVRO_DATABLOCK *file, double val);
bool maxavro_read_datablock_start(MAXAVRO_FILE *file);
bool maxavro_verify_block(MAXAVRO_FILE *file);
const char* type_to_string(enum maxavro_value_type type);
enum maxavro_value_type string_to_type(const char *str);

View File

@ -18,10 +18,6 @@
#include <maxscale/log.h>
#include <errno.h>
bool maxavro_read_datablock_start(MAXAVRO_FILE* file);
bool maxavro_verify_block(MAXAVRO_FILE* file);
const char* type_to_string(enum maxavro_value_type type);
/**
* @brief Read a single value from a file
* @param file File to read from
@ -30,10 +26,10 @@ const char* type_to_string(enum maxavro_value_type type);
* @param field_num Field index in the schema
* @return JSON object or NULL if an error occurred
*/
static json_t* read_and_pack_value(MAXAVRO_FILE* file, MAXAVRO_SCHEMA_FIELD* field)
static json_t* read_and_pack_value(MAXAVRO_FILE *file, MAXAVRO_SCHEMA_FIELD *field, enum maxavro_value_type type)
{
json_t* value = NULL;
switch (field->type)
switch (type)
{
case MAXAVRO_TYPE_BOOL:
if (file->buffer_ptr < file->buffer_end)
@ -109,6 +105,23 @@ static json_t* read_and_pack_value(MAXAVRO_FILE* file, MAXAVRO_SCHEMA_FIELD* fie
}
break;
case MAXAVRO_TYPE_UNION:
{
json_t *arr = field->extra;
uint64_t val = 0;
if (maxavro_read_integer(file, &val) && val < json_array_size(arr))
{
json_t* union_type = json_object_get(json_array_get(arr, val), "type");
value = read_and_pack_value(file, field, string_to_type(json_string_value(union_type)));
}
}
break;
case MAXAVRO_TYPE_NULL:
value = json_null();
break;
default:
MXS_ERROR("Unimplemented type: %d", field->type);
break;
@ -174,7 +187,7 @@ json_t* maxavro_record_read_json(MAXAVRO_FILE* file)
{
for (size_t i = 0; i < file->schema->num_fields; i++)
{
json_t* value = read_and_pack_value(file, &file->schema->fields[i]);
json_t* value = read_and_pack_value(file, &file->schema->fields[i], file->schema->fields[i].type);
if (value)
{
json_object_set_new(object, file->schema->fields[i].name, value);

View File

@ -78,6 +78,13 @@ static enum maxavro_value_type unpack_to_type(json_t* object,
enum maxavro_value_type rval = MAXAVRO_TYPE_UNKNOWN;
json_t* type = NULL;
if (json_is_array(object) && json_is_object(json_array_get(object, 0)))
{
json_incref(object);
field->extra = object;
return MAXAVRO_TYPE_UNION;
}
if (json_is_object(object))
{
json_t* tmp = NULL;
@ -191,7 +198,7 @@ static void maxavro_schema_field_free(MAXAVRO_SCHEMA_FIELD* field)
if (field)
{
MXS_FREE(field->name);
if (field->type == MAXAVRO_TYPE_ENUM)
if (field->type == MAXAVRO_TYPE_ENUM || field->type == MAXAVRO_TYPE_UNION)
{
json_decref((json_t*)field->extra);
}

View File

@ -2,7 +2,7 @@
add_library(cdc_connector SHARED cdc_connector.cpp)
add_dependencies(cdc_connector jansson)
target_link_libraries(cdc_connector ${JANSSON_LIBRARIES} crypto)
set_target_properties(cdc_connector PROPERTIES VERSION "1.0.0")
set_target_properties(cdc_connector PROPERTIES VERSION "1.0.1")
add_dependencies(cdc_connector jansson)
# Static version of the library

View File

@ -362,6 +362,7 @@ void Connection::process_schema(json_t* json)
SRow Connection::process_row(json_t* js)
{
std::set<size_t> nulls;
ValueVector values;
values.reserve(m_keys->size());
m_error.clear();
@ -373,6 +374,11 @@ SRow Connection::process_row(json_t* js)
if (v)
{
if (json_is_null(v))
{
nulls.insert(values.size());
}
values.push_back(json_to_string(v));
}
else
@ -387,7 +393,7 @@ SRow Connection::process_row(json_t* js)
if (m_error.empty())
{
rval = SRow(new Row(m_keys, m_types, values));
rval = SRow(new Row(m_keys, m_types, values, nulls));
}
return rval;

View File

@ -23,6 +23,7 @@
#include <vector>
#include <deque>
#include <map>
#include <set>
#include <algorithm>
#include <jansson.h>
@ -199,6 +200,31 @@ public:
return m_values.at(it - m_keys->begin());
}
/**
* Check if a field has a NULL value
*
* @param i The field index
*
* @return True if the field has a NULL value
*/
bool is_null(size_t i) const
{
return m_nulls.count(i);
}
/**
* Check if a field has a NULL value
*
* @param str The field name
*
* @return True if the field has a NULL value
*/
bool is_null(const std::string& str) const
{
ValueVector::const_iterator it = std::find(m_keys->begin(), m_keys->end(), str);
return m_nulls.count(it - m_keys->begin());
}
/**
* Get the GTID of this row
*
@ -242,16 +268,19 @@ public:
private:
SValueVector m_keys;
SValueVector m_types;
ValueVector m_values;
ValueVector m_values;
std::set<size_t> m_nulls;
// Only a Connection should construct an InternalRow
friend class Connection;
Row(SValueVector& keys,
SValueVector& types,
ValueVector& values)
: m_keys(keys)
, m_types(types)
ValueVector& values,
std::set<size_t>& nulls):
m_keys(keys),
m_types(types),
m_nulls(nulls)
{
m_values.swap(values);
}

View File

@ -1042,4 +1042,7 @@ add_test_executable(mxs2111_auth_string.cpp mxs2111_auth_string replication LABE
# MXS-2115: Automatic version_string detection
add_test_executable(mxs2115_version_string.cpp mxs2115_version_string replication LABELS REPL_BACKEND)
# MXS-2106: NULL values with avrorouter
add_test_executable(mxs2106_avro_null.cpp mxs2106_avro_null avro LABELS avrorouter REPL_BACKEND)
configure_file(templates.h.in ${CMAKE_CURRENT_BINARY_DIR}/templates.h @ONLY)

View File

@ -12,101 +12,58 @@
* - try INSERT again expecting success (user should not be changed)
*/
#include <iostream>
#include "testconnections.h"
int main(int argc, char* argv[])
void run_test(TestConnections& test, MYSQL* conn)
{
TestConnections* Test = new TestConnections(argc, argv);
Test->set_timeout(60);
test.expect(mysql_change_user(conn, "user", "pass2", "test") == 0,
"changing user failed: %s", mysql_error(conn));
Test->repl->connect();
Test->maxscales->connect_maxscale(0);
test.expect(execute_query_silent(conn, "INSERT INTO t1 VALUES (77, 11);") != 0,
"INSERT query succeeded without INSERT privilege");
Test->tprintf("Creating user 'user' \n");
test.expect(mysql_change_user(conn, test.repl->user_name, test.repl->password, "test") == 0,
"changing user failed: %s", mysql_error(conn));
execute_query(Test->maxscales->conn_rwsplit[0], "DROP USER 'user'@'%%'");
Test->try_query(Test->maxscales->conn_rwsplit[0], (char*) "CREATE USER user@'%%' identified by 'pass2'");
Test->try_query(Test->maxscales->conn_rwsplit[0], (char*) "GRANT SELECT ON test.* TO user@'%%'");
Test->try_query(Test->maxscales->conn_rwsplit[0], (char*) "FLUSH PRIVILEGES;");
Test->try_query(Test->maxscales->conn_rwsplit[0], (char*) "DROP TABLE IF EXISTS t1");
Test->try_query(Test->maxscales->conn_rwsplit[0], (char*) "CREATE TABLE t1 (x1 int, fl int)");
test.expect(execute_query_silent(conn, "INSERT INTO t1 VALUES (77, 11);") == 0,
"INSERT query succeeded without INSERT privilege");
Test->maxscales->restart_maxscale();
sleep(2);
Test->maxscales->connect_maxscale(0);
Test->tprintf("Changing user... \n");
Test->add_result(mysql_change_user(Test->maxscales->conn_rwsplit[0],
(char*) "user",
(char*) "pass2",
(char*) "test"),
"changing user failed \n");
Test->tprintf("mysql_error is %s\n", mysql_error(Test->maxscales->conn_rwsplit[0]));
test.expect(mysql_change_user(conn, "user", "wrong_pass2", "test") != 0,
"changing user with wrong password successed!");
Test->tprintf("Trying INSERT (expecting access denied)... \n");
if (execute_query(Test->maxscales->conn_rwsplit[0], (char*) "INSERT INTO t1 VALUES (77, 11);") == 0)
{
Test->add_result(1, "INSERT query succedded to user which does not have INSERT PRIVILEGES\n");
}
test.expect(strstr(mysql_error(conn), "Access denied for user"),
"Wrong error message returned on failed authentication");
Test->tprintf("Changing user back... \n");
Test->add_result(mysql_change_user(Test->maxscales->conn_rwsplit[0],
Test->repl->user_name,
Test->repl->password,
(char*) "test"),
"changing user failed \n");
Test->tprintf("Trying INSERT (expecting success)... \n");
Test->try_query(Test->maxscales->conn_rwsplit[0], (char*) "INSERT INTO t1 VALUES (77, 12);");
Test->tprintf("Changing user with wrong password... \n");
if (mysql_change_user(Test->maxscales->conn_rwsplit[0],
(char*) "user",
(char*) "wrong_pass2",
(char*) "test") == 0)
{
Test->add_result(1, "changing user with wrong password successed! \n");
}
Test->tprintf("%s\n", mysql_error(Test->maxscales->conn_rwsplit[0]));
if ((strstr(mysql_error(Test->maxscales->conn_rwsplit[0]), "Access denied for user")) == NULL)
{
Test->add_result(1, "There is no proper error message\n");
}
Test->tprintf("Trying INSERT again (expecting failure - change user should have failed)...");
Test->add_result(!execute_query(Test->maxscales->conn_rwsplit[0],
(char*) "INSERT INTO t1 VALUES (77, 13);"),
"Query should fail, MaxScale should disconnect on auth failure");
Test->tprintf("Changing user with wrong password using ReadConn \n");
if (mysql_change_user(Test->maxscales->conn_slave[0],
(char*) "user",
(char*) "wrong_pass2",
(char*) "test") == 0)
{
Test->add_result(1, "FAILED: changing user with wrong password successed! \n");
}
Test->tprintf("%s\n", mysql_error(Test->maxscales->conn_slave[0]));
if ((strstr(mysql_error(Test->maxscales->conn_slave[0]), "Access denied for user")) == NULL)
{
Test->add_result(1, "There is no proper error message\n");
}
Test->tprintf("Changing user for ReadConn \n");
Test->add_result(mysql_change_user(Test->maxscales->conn_slave[0],
(char*) "user",
(char*) "pass2",
(char*) "test"),
"changing user failed \n");
Test->maxscales->connect_maxscale(0);
Test->try_query(Test->maxscales->conn_rwsplit[0], (char*) "DROP USER user@'%%';");
execute_query_silent(Test->maxscales->conn_rwsplit[0], "DROP TABLE test.t1");
Test->maxscales->close_maxscale_connections(0);
int rval = Test->global_result;
delete Test;
return rval;
test.expect(execute_query_silent(conn, "INSERT INTO t1 VALUES (77, 11);") != 0,
"Query should fail, MaxScale should disconnect on auth failure");
}
int main(int argc, char *argv[])
{
TestConnections test(argc, argv);
test.repl->connect();
execute_query(test.repl->nodes[0], "DROP USER 'user'@'%%'");
test.try_query(test.repl->nodes[0], "CREATE USER user@'%%' identified by 'pass2'");
test.try_query(test.repl->nodes[0], "GRANT SELECT ON test.* TO user@'%%'");
test.try_query(test.repl->nodes[0], "FLUSH PRIVILEGES;");
test.try_query(test.repl->nodes[0], "DROP TABLE IF EXISTS t1");
test.try_query(test.repl->nodes[0], "CREATE TABLE t1 (x1 int, fl int)");
test.repl->sync_slaves();
test.repl->disconnect();
test.maxscales->connect();
test.tprintf("Testing readwritesplit");
run_test(test, test.maxscales->conn_rwsplit[0]);
test.tprintf("Testing readconnroute");
run_test(test, test.maxscales->conn_master[0]);
test.maxscales->disconnect();
test.repl->connect();
execute_query_silent(test.repl->nodes[0], "DROP USER user@'%%';");
execute_query_silent(test.repl->nodes[0], "DROP TABLE test.t1");
test.repl->disconnect();
return test.global_result;
}

View File

@ -0,0 +1,76 @@
/**
* MXS-2106: Maxscale CDC JSON output does not respect null values
*/
#include "testconnections.h"
#include <cdc_connector.h>
int main(int argc, char** argv)
{
TestConnections::skip_maxscale_start(true);
TestConnections::check_nodes(false);
TestConnections test(argc, argv);
test.set_timeout(120);
test.replicate_from_master(0);
test.repl->connect();
execute_query(test.repl->nodes[0],
"CREATE OR REPLACE TABLE `test`.`test1` ("
" `test1_id` int(10) unsigned NOT NULL AUTO_INCREMENT,"
" `some_id` int(10) unsigned DEFAULT NULL,"
" `desc` varchar(50) DEFAULT NULL,"
" `some_date` timestamp NULL DEFAULT NULL,"
" `updated` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,"
" PRIMARY KEY (`test1_id`)"
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;"
"INSERT INTO test.test1(some_id,`desc`,some_date) VALUES (1,NULL,NULL), (NULL,'value1',NULL),"
"(NULL,NULL,NOW());"
"UPDATE test.test1 SET some_id = NULL, `desc` = 'value2', some_date = NOW() WHERE test1_id = 1;"
"UPDATE test.test1 SET some_id = 35, `desc` = NULL, some_date = NULL WHERE test1_id = 2;");
/** Give avrorouter some time to process the events */
test.stop_timeout();
sleep(10);
test.set_timeout(120);
CDC::Connection conn(test.maxscales->IP[0], 4001, "skysql", "skysql");
test.expect(conn.connect("test.test1"), "Failed to connect");
auto check = [&](const std::string& name) {
static int i = 1;
CDC::SRow row = conn.read();
if (row)
{
test.expect(row->is_null(name),
"%d: `%s` is not null: %s",
i++,
name.c_str(),
row->value(name).c_str());
}
else
{
test.tprintf("Error: %s", conn.error().c_str());
}
};
// The three inserts
check("some_date");
check("some_id");
check("some_id");
// First update
check("desc");
check("some_id");
// Second update
check("some_id");
check("desc");
execute_query(test.repl->nodes[0], "DROP TABLE `test1`");
test.repl->disconnect();
return test.global_result;
}

View File

@ -46,7 +46,7 @@ The "user" and "password" flags are required.
// Avro field
type Field struct {
Name string `json:"name"`
Type string `json:"type"`
Type []string `json:"type"`
RealType string `json:"real_type"`
Length int `json:"length"`
}
@ -81,25 +81,23 @@ func (f *Field) ToAvroType() {
switch f.Type {
case "date", "datetime", "time", "timestamp", "year", "tinytext", "text",
"mediumtext", "longtext", "char", "varchar":
f.Type = "string"
f.Type = ["null", "string"]
f.Length, _ = strconv.Atoi(length_re.ReplaceAllString(orig, "$1"))
case "enum", "set":
f.Type = "string"
f.Type = ["null", "string"]
case "tinyblob", "blob", "mediumblob", "longblob", "binary", "varbinary":
f.Type = "bytes"
f.Type = ["null", "bytes"]
case "int", "smallint", "mediumint", "integer", "tinyint", "short", "bit":
f.Type = "int"
f.Type = ["null", "int"]
case "float":
f.Type = "float"
f.Type = ["null", "float"]
case "double", "decimal":
f.Type = "double"
case "null":
f.Type = "null"
f.Type = ["null", "double"]
case "long", "bigint":
f.Type = "long"
f.Type = ["null", "long"]
default:
LogObject(f)
f.Type = "string"
f.Type = ["null", "string"]
}
}

View File

@ -43,24 +43,23 @@ def parse_field(row):
else:
res["length"] = -1
type = "string"
if name in ("date", "datetime", "time", "timestamp", "year", "tinytext", "text",
"mediumtext", "longtext", "char", "varchar", "enum", "set"):
res["type"] = "string"
type = "string"
elif name in ("tinyblob", "blob", "mediumblob", "longblob", "binary", "varbinary"):
res["type"] = "bytes"
type = "bytes"
elif name in ("int", "smallint", "mediumint", "integer", "tinyint", "short", "bit"):
res["type"] = "int"
type = "int"
elif name in ("float"):
res["type"] = "float"
type = "float"
elif name in ("double", "decimal"):
res["type"] = "double"
elif name in ("null"):
res["type"] = "null"
type = "double"
elif name in ("long", "bigint"):
res["type"] = "long"
else:
res["type"] = "string"
type = "long"
res["type"] = ["null", type]
res["name"] = row[0].lower()

View File

@ -0,0 +1,2 @@
mysql-connector-python
kafka-python

View File

@ -232,10 +232,11 @@ char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEv
json_array_append_new(array,
json_pack_ex(&err,
0,
"{s:s, s:s, s:s, s:i}",
"{s:s, s:[s, s], s:s, s:i}",
"name",
create->columns[i].name.c_str(),
"type",
"null",
column_type_to_avro_type(map->column_types[i]),
"real_type",
create->columns[i].type.c_str(),
@ -449,23 +450,16 @@ void AvroConverter::column(int i, uint8_t* value, int len)
void AvroConverter::column(int i)
{
set_active(i);
if (column_is_blob(m_map->column_types[i]))
{
uint8_t nullvalue = 0;
avro_value_set_bytes(&m_field, &nullvalue, 1);
}
else
{
avro_value_set_null(&m_field);
}
avro_value_set_branch(&m_union_value, 0, &m_field);
avro_value_set_null(&m_field);
}
void AvroConverter::set_active(int i)
{
MXB_AT_DEBUG(int rc = ) avro_value_get_by_name(&m_record,
m_create->columns[i].name.c_str(),
&m_field,
&m_union_value,
NULL);
mxb_assert(rc == 0);
avro_value_set_branch(&m_union_value, 1, &m_field);
}

View File

@ -65,6 +65,7 @@ private:
avro_value_iface_t* m_writer_iface;
avro_file_writer_t* m_avro_file;
avro_value_t m_record;
avro_value_t m_union_value;
avro_value_t m_field;
std::string m_avrodir;
AvroTables m_open_tables;

View File

@ -264,7 +264,10 @@ uint8_t* process_row_event_data(STableMapEvent map,
{
if (bit_is_set(columns_present, ncolumns, i))
{
avro_value_t field;
avro_value_set_branch(&union_value, 1, &field);
npresent++;
if (bit_is_set(null_bitmap, ncolumns, i))
{
sprintf(trace[i], "[%ld] NULL", i);