Add missing error handling to Avro file handling
Some of the JSON errors weren't handled which could cause problems when a malformed schema definition is read. Also added more error messages for situations when opening of the files fails.
This commit is contained in:
@ -126,21 +126,35 @@ MAXAVRO_SCHEMA* maxavro_schema_alloc(const char* json)
|
|||||||
if (schema)
|
if (schema)
|
||||||
{
|
{
|
||||||
json_t *field_arr = NULL;
|
json_t *field_arr = NULL;
|
||||||
json_unpack(schema, "{s:o}", "fields", &field_arr);
|
|
||||||
size_t arr_size = json_array_size(field_arr);
|
|
||||||
rval->fields = malloc(sizeof(MAXAVRO_SCHEMA_FIELD) * arr_size);
|
|
||||||
rval->num_fields = arr_size;
|
|
||||||
|
|
||||||
for (int i = 0; i < arr_size; i++)
|
if (json_unpack(schema, "{s:o}", "fields", &field_arr) == 0)
|
||||||
{
|
{
|
||||||
json_t *object = json_array_get(field_arr, i);
|
size_t arr_size = json_array_size(field_arr);
|
||||||
char *key;
|
rval->fields = malloc(sizeof(MAXAVRO_SCHEMA_FIELD) * arr_size);
|
||||||
json_t *value_obj;
|
rval->num_fields = arr_size;
|
||||||
|
|
||||||
json_unpack(object, "{s:s s:o}", "name", &key, "type", &value_obj);
|
for (int i = 0; i < arr_size; i++)
|
||||||
rval->fields[i].name = strdup(key);
|
{
|
||||||
rval->fields[i].type = unpack_to_type(value_obj, &rval->fields[i]);
|
json_t *object = json_array_get(field_arr, i);
|
||||||
|
char *key;
|
||||||
|
json_t *value_obj;
|
||||||
|
|
||||||
|
if (json_unpack(object, "{s:s s:o}", "name", &key, "type", &value_obj) == 0)
|
||||||
|
{
|
||||||
|
rval->fields[i].name = strdup(key);
|
||||||
|
rval->fields[i].type = unpack_to_type(value_obj, &rval->fields[i]);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXS_ERROR("Failed to unpack JSON Object \"name\": %s", json);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXS_ERROR("Failed to unpack JSON Object \"fields\": %s", json);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
json_decref(schema);
|
json_decref(schema);
|
||||||
}
|
}
|
||||||
|
@ -775,48 +775,54 @@ static bool avro_client_stream_data(AVRO_CLIENT *client)
|
|||||||
char filename[PATH_MAX + 1];
|
char filename[PATH_MAX + 1];
|
||||||
snprintf(filename, PATH_MAX, "%s/%s", router->avrodir, client->avro_binfile);
|
snprintf(filename, PATH_MAX, "%s/%s", router->avrodir, client->avro_binfile);
|
||||||
|
|
||||||
|
bool ok = true;
|
||||||
|
|
||||||
spinlock_acquire(&client->file_lock);
|
spinlock_acquire(&client->file_lock);
|
||||||
if (client->file_handle == NULL)
|
if (client->file_handle == NULL &&
|
||||||
|
(client->file_handle = maxavro_file_open(filename)) == NULL)
|
||||||
{
|
{
|
||||||
client->file_handle = maxavro_file_open(filename);
|
ok = false;
|
||||||
}
|
}
|
||||||
spinlock_release(&client->file_lock);
|
spinlock_release(&client->file_lock);
|
||||||
|
|
||||||
switch (client->format)
|
if (ok)
|
||||||
{
|
{
|
||||||
case AVRO_FORMAT_JSON:
|
switch (client->format)
|
||||||
/** Currently only JSON format supports seeking to a GTID */
|
{
|
||||||
if (client->requested_gtid &&
|
case AVRO_FORMAT_JSON:
|
||||||
seek_to_index_pos(client, client->file_handle) &&
|
/** Currently only JSON format supports seeking to a GTID */
|
||||||
seek_to_gtid(client, client->file_handle))
|
if (client->requested_gtid &&
|
||||||
{
|
seek_to_index_pos(client, client->file_handle) &&
|
||||||
client->requested_gtid = false;
|
seek_to_gtid(client, client->file_handle))
|
||||||
}
|
{
|
||||||
|
client->requested_gtid = false;
|
||||||
|
}
|
||||||
|
|
||||||
read_more = stream_json(client);
|
read_more = stream_json(client);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case AVRO_FORMAT_AVRO:
|
case AVRO_FORMAT_AVRO:
|
||||||
read_more = stream_binary(client);
|
read_more = stream_binary(client);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
MXS_ERROR("Unexpected format: %d", client->format);
|
MXS_ERROR("Unexpected format: %d", client->format);
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (maxavro_get_error(client->file_handle) != MAXAVRO_ERR_NONE)
|
||||||
|
{
|
||||||
|
MXS_ERROR("Reading Avro file failed with error '%s'.",
|
||||||
|
maxavro_get_error_string(client->file_handle));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* update client struct */
|
||||||
|
memcpy(&client->avro_file, client->file_handle, sizeof(client->avro_file));
|
||||||
|
|
||||||
|
/* may be just use client->avro_file->records_read and remove this var */
|
||||||
|
client->last_sent_pos = client->avro_file.records_read;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (maxavro_get_error(client->file_handle) != MAXAVRO_ERR_NONE)
|
|
||||||
{
|
|
||||||
MXS_ERROR("Reading Avro file failed with error '%s'.",
|
|
||||||
maxavro_get_error_string(client->file_handle));
|
|
||||||
}
|
|
||||||
|
|
||||||
/* update client struct */
|
|
||||||
memcpy(&client->avro_file, client->file_handle, sizeof(client->avro_file));
|
|
||||||
|
|
||||||
/* may be just use client->avro_file->records_read and remove this var */
|
|
||||||
client->last_sent_pos = client->avro_file.records_read;
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -72,6 +72,7 @@ int index_query_cb(void *data, int rows, char** values, char** names)
|
|||||||
void avro_index_file(AVRO_INSTANCE *router, const char* filename)
|
void avro_index_file(AVRO_INSTANCE *router, const char* filename)
|
||||||
{
|
{
|
||||||
MAXAVRO_FILE *file = maxavro_file_open(filename);
|
MAXAVRO_FILE *file = maxavro_file_open(filename);
|
||||||
|
|
||||||
if (file)
|
if (file)
|
||||||
{
|
{
|
||||||
char *name = strrchr(filename, '/');
|
char *name = strrchr(filename, '/');
|
||||||
@ -165,6 +166,10 @@ void avro_index_file(AVRO_INSTANCE *router, const char* filename)
|
|||||||
|
|
||||||
maxavro_file_close(file);
|
maxavro_file_close(file);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXS_ERROR("Failed to open file '%s' when generating file index.", filename);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Reference in New Issue
Block a user