MXS-2106: Fix NULL value handling
The NULL values were not stored as NULL Avro values due to the fact that the file format has no native NULL-ness for the basic types. To solve this, all values must be stored as a union that contains the actual type as well as the null type. Unions were not implemented in the maxavro library but implementing means simply recursing one level down.
This commit is contained in:
@ -39,6 +39,7 @@ enum maxavro_value_type
|
||||
MAXAVRO_TYPE_BYTES,
|
||||
MAXAVRO_TYPE_ENUM,
|
||||
MAXAVRO_TYPE_NULL,
|
||||
MAXAVRO_TYPE_UNION,
|
||||
MAXAVRO_TYPE_MAX
|
||||
};
|
||||
|
||||
|
@ -54,3 +54,8 @@ bool maxavro_datablock_add_integer(MAXAVRO_DATABLOCK *file, uint64_t val);
|
||||
bool maxavro_datablock_add_string(MAXAVRO_DATABLOCK *file, const char* str);
|
||||
bool maxavro_datablock_add_float(MAXAVRO_DATABLOCK *file, float val);
|
||||
bool maxavro_datablock_add_double(MAXAVRO_DATABLOCK *file, double val);
|
||||
|
||||
bool maxavro_read_datablock_start(MAXAVRO_FILE *file);
|
||||
bool maxavro_verify_block(MAXAVRO_FILE *file);
|
||||
const char* type_to_string(enum maxavro_value_type type);
|
||||
enum maxavro_value_type string_to_type(const char *str);
|
||||
|
@ -18,10 +18,6 @@
|
||||
#include <maxscale/log_manager.h>
|
||||
#include <errno.h>
|
||||
|
||||
bool maxavro_read_datablock_start(MAXAVRO_FILE *file);
|
||||
bool maxavro_verify_block(MAXAVRO_FILE *file);
|
||||
const char* type_to_string(enum maxavro_value_type type);
|
||||
|
||||
/**
|
||||
* @brief Read a single value from a file
|
||||
* @param file File to read from
|
||||
@ -30,10 +26,10 @@ const char* type_to_string(enum maxavro_value_type type);
|
||||
* @param field_num Field index in the schema
|
||||
* @return JSON object or NULL if an error occurred
|
||||
*/
|
||||
static json_t* read_and_pack_value(MAXAVRO_FILE *file, MAXAVRO_SCHEMA_FIELD *field)
|
||||
static json_t* read_and_pack_value(MAXAVRO_FILE *file, MAXAVRO_SCHEMA_FIELD *field, enum maxavro_value_type type)
|
||||
{
|
||||
json_t* value = NULL;
|
||||
switch (field->type)
|
||||
switch (type)
|
||||
{
|
||||
case MAXAVRO_TYPE_BOOL:
|
||||
if (file->buffer_ptr < file->buffer_end)
|
||||
@ -108,6 +104,23 @@ static json_t* read_and_pack_value(MAXAVRO_FILE *file, MAXAVRO_SCHEMA_FIELD *fie
|
||||
}
|
||||
break;
|
||||
|
||||
case MAXAVRO_TYPE_UNION:
|
||||
{
|
||||
json_t *arr = field->extra;
|
||||
uint64_t val = 0;
|
||||
|
||||
if (maxavro_read_integer(file, &val) && val < json_array_size(arr))
|
||||
{
|
||||
json_t* union_type = json_object_get(json_array_get(arr, val), "type");
|
||||
value = read_and_pack_value(file, field, string_to_type(json_string_value(union_type)));
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case MAXAVRO_TYPE_NULL:
|
||||
value = json_null();
|
||||
break;
|
||||
|
||||
default:
|
||||
MXS_ERROR("Unimplemented type: %d", field->type);
|
||||
break;
|
||||
@ -173,7 +186,7 @@ json_t* maxavro_record_read_json(MAXAVRO_FILE *file)
|
||||
{
|
||||
for (size_t i = 0; i < file->schema->num_fields; i++)
|
||||
{
|
||||
json_t* value = read_and_pack_value(file, &file->schema->fields[i]);
|
||||
json_t* value = read_and_pack_value(file, &file->schema->fields[i], file->schema->fields[i].type);
|
||||
if (value)
|
||||
{
|
||||
json_object_set_new(object, file->schema->fields[i].name, value);
|
||||
|
@ -78,6 +78,13 @@ static enum maxavro_value_type unpack_to_type(json_t *object,
|
||||
enum maxavro_value_type rval = MAXAVRO_TYPE_UNKNOWN;
|
||||
json_t* type = NULL;
|
||||
|
||||
if (json_is_array(object) && json_is_object(json_array_get(object, 0)))
|
||||
{
|
||||
json_incref(object);
|
||||
field->extra = object;
|
||||
return MAXAVRO_TYPE_UNION;
|
||||
}
|
||||
|
||||
if (json_is_object(object))
|
||||
{
|
||||
json_t *tmp = NULL;
|
||||
@ -191,7 +198,7 @@ static void maxavro_schema_field_free(MAXAVRO_SCHEMA_FIELD *field)
|
||||
if (field)
|
||||
{
|
||||
MXS_FREE(field->name);
|
||||
if (field->type == MAXAVRO_TYPE_ENUM)
|
||||
if (field->type == MAXAVRO_TYPE_ENUM || field->type == MAXAVRO_TYPE_UNION)
|
||||
{
|
||||
json_decref((json_t*)field->extra);
|
||||
}
|
||||
|
@ -561,7 +561,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
uint8_t *ptr, uint8_t *columns_present, uint8_t *end)
|
||||
{
|
||||
int npresent = 0;
|
||||
avro_value_t field;
|
||||
avro_value_t union_value;
|
||||
long ncolumns = map->columns;
|
||||
uint8_t *metadata = map->column_metadata;
|
||||
size_t metadata_offset = 0;
|
||||
@ -580,25 +580,21 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
|
||||
for (long i = 0; i < ncolumns && npresent < ncolumns; i++)
|
||||
{
|
||||
ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL);
|
||||
ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &union_value, NULL);
|
||||
ss_dassert(rc == 0);
|
||||
|
||||
if (bit_is_set(columns_present, ncolumns, i))
|
||||
{
|
||||
avro_value_t field;
|
||||
avro_value_set_branch(&union_value, 1, &field);
|
||||
npresent++;
|
||||
|
||||
if (bit_is_set(null_bitmap, ncolumns, i))
|
||||
{
|
||||
sprintf(trace[i], "[%ld] NULL", i);
|
||||
if (column_is_blob(map->column_types[i]))
|
||||
{
|
||||
uint8_t nullvalue = 0;
|
||||
avro_value_set_bytes(&field, &nullvalue, 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
avro_value_set_branch(&union_value, 0, &field);
|
||||
avro_value_set_null(&field);
|
||||
}
|
||||
}
|
||||
else if (column_is_fixed_string(map->column_types[i]))
|
||||
{
|
||||
/** ENUM and SET are stored as STRING types with the type stored
|
||||
|
@ -130,9 +130,9 @@ char* json_new_schema_from_table(TABLE_MAP *map)
|
||||
ss_info_dassert(create->column_types[i] && *create->column_types[i],
|
||||
"Column type should not be empty or NULL");
|
||||
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}",
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:[s, s], s:s, s:i}",
|
||||
"name", create->column_names[i],
|
||||
"type", column_type_to_avro_type(map->column_types[i]),
|
||||
"type", "null", column_type_to_avro_type(map->column_types[i]),
|
||||
"real_type", create->column_types[i],
|
||||
"length", create->column_lengths[i]));
|
||||
}
|
||||
|
Reference in New Issue
Block a user