diff --git a/avro/maxavro_schema.c b/avro/maxavro_schema.c index d26ea75e6..257274696 100644 --- a/avro/maxavro_schema.c +++ b/avro/maxavro_schema.c @@ -126,21 +126,35 @@ MAXAVRO_SCHEMA* maxavro_schema_alloc(const char* json) if (schema) { json_t *field_arr = NULL; - json_unpack(schema, "{s:o}", "fields", &field_arr); - size_t arr_size = json_array_size(field_arr); - rval->fields = malloc(sizeof(MAXAVRO_SCHEMA_FIELD) * arr_size); - rval->num_fields = arr_size; - for (int i = 0; i < arr_size; i++) + if (json_unpack(schema, "{s:o}", "fields", &field_arr) == 0) { - json_t *object = json_array_get(field_arr, i); - char *key; - json_t *value_obj; + size_t arr_size = json_array_size(field_arr); + rval->fields = malloc(sizeof(MAXAVRO_SCHEMA_FIELD) * arr_size); + rval->num_fields = arr_size; - json_unpack(object, "{s:s s:o}", "name", &key, "type", &value_obj); - rval->fields[i].name = strdup(key); - rval->fields[i].type = unpack_to_type(value_obj, &rval->fields[i]); + for (int i = 0; i < arr_size; i++) + { + json_t *object = json_array_get(field_arr, i); + char *key; + json_t *value_obj; + + if (json_unpack(object, "{s:s s:o}", "name", &key, "type", &value_obj) == 0) + { + rval->fields[i].name = strdup(key); + rval->fields[i].type = unpack_to_type(value_obj, &rval->fields[i]); + } + else + { + MXS_ERROR("Failed to unpack JSON Object \"name\": %s", json); + } + } } + else + { + MXS_ERROR("Failed to unpack JSON Object \"fields\": %s", json); + } + json_decref(schema); } diff --git a/server/modules/routing/avro/avro_client.c b/server/modules/routing/avro/avro_client.c index e7873f953..90cd9e268 100644 --- a/server/modules/routing/avro/avro_client.c +++ b/server/modules/routing/avro/avro_client.c @@ -775,48 +775,54 @@ static bool avro_client_stream_data(AVRO_CLIENT *client) char filename[PATH_MAX + 1]; snprintf(filename, PATH_MAX, "%s/%s", router->avrodir, client->avro_binfile); + bool ok = true; + spinlock_acquire(&client->file_lock); - if (client->file_handle == NULL) + if (client->file_handle == NULL && + (client->file_handle = maxavro_file_open(filename)) == NULL) { - client->file_handle = maxavro_file_open(filename); + ok = false; } spinlock_release(&client->file_lock); - switch (client->format) + if (ok) { - case AVRO_FORMAT_JSON: - /** Currently only JSON format supports seeking to a GTID */ - if (client->requested_gtid && - seek_to_index_pos(client, client->file_handle) && - seek_to_gtid(client, client->file_handle)) - { - client->requested_gtid = false; - } + switch (client->format) + { + case AVRO_FORMAT_JSON: + /** Currently only JSON format supports seeking to a GTID */ + if (client->requested_gtid && + seek_to_index_pos(client, client->file_handle) && + seek_to_gtid(client, client->file_handle)) + { + client->requested_gtid = false; + } - read_more = stream_json(client); - break; + read_more = stream_json(client); + break; - case AVRO_FORMAT_AVRO: - read_more = stream_binary(client); - break; + case AVRO_FORMAT_AVRO: + read_more = stream_binary(client); + break; - default: - MXS_ERROR("Unexpected format: %d", client->format); - break; + default: + MXS_ERROR("Unexpected format: %d", client->format); + break; + } + + + if (maxavro_get_error(client->file_handle) != MAXAVRO_ERR_NONE) + { + MXS_ERROR("Reading Avro file failed with error '%s'.", + maxavro_get_error_string(client->file_handle)); + } + + /* update client struct */ + memcpy(&client->avro_file, client->file_handle, sizeof(client->avro_file)); + + /* may be just use client->avro_file->records_read and remove this var */ + client->last_sent_pos = client->avro_file.records_read; } - - - if (maxavro_get_error(client->file_handle) != MAXAVRO_ERR_NONE) - { - MXS_ERROR("Reading Avro file failed with error '%s'.", - maxavro_get_error_string(client->file_handle)); - } - - /* update client struct */ - memcpy(&client->avro_file, client->file_handle, sizeof(client->avro_file)); - - /* may be just use client->avro_file->records_read and remove this var */ - client->last_sent_pos = client->avro_file.records_read; } else { diff --git a/server/modules/routing/avro/avro_index.c b/server/modules/routing/avro/avro_index.c index 3696adf14..905c712c4 100644 --- a/server/modules/routing/avro/avro_index.c +++ b/server/modules/routing/avro/avro_index.c @@ -72,6 +72,7 @@ int index_query_cb(void *data, int rows, char** values, char** names) void avro_index_file(AVRO_INSTANCE *router, const char* filename) { MAXAVRO_FILE *file = maxavro_file_open(filename); + if (file) { char *name = strrchr(filename, '/'); @@ -165,6 +166,10 @@ void avro_index_file(AVRO_INSTANCE *router, const char* filename) maxavro_file_close(file); } + else + { + MXS_ERROR("Failed to open file '%s' when generating file index.", filename); + } } /**