diff --git a/avro/maxavro_file.c b/avro/maxavro_file.c index c1d9387a4..8e94da357 100644 --- a/avro/maxavro_file.c +++ b/avro/maxavro_file.c @@ -12,6 +12,7 @@ */ #include "maxavro.h" +#include "skygw_utils.h" #include #include #include @@ -49,11 +50,12 @@ bool maxavro_verify_block(MAXAVRO_FILE *file) int rc = fread(sync, 1, SYNC_MARKER_SIZE, file->file); if (rc != SYNC_MARKER_SIZE) { - if (rc == -1) + if (ferror(file->file)) { - MXS_ERROR("Failed to read file: %d %s", errno, strerror(errno)); + char err[STRERROR_BUFLEN]; + MXS_ERROR("Failed to read file: %d %s", errno, strerror_r(errno, err, sizeof(err))); } - else + else if (rc > 0 || !feof(file->file)) { MXS_ERROR("Short read when reading sync marker. Read %d bytes instead of %d", rc, SYNC_MARKER_SIZE); diff --git a/avro/maxavro_record.c b/avro/maxavro_record.c index 982913cea..fb5b6b5f3 100644 --- a/avro/maxavro_record.c +++ b/avro/maxavro_record.c @@ -49,9 +49,11 @@ static json_t* read_and_pack_value(MAXAVRO_FILE *file, MAXAVRO_SCHEMA_FIELD *fie case MAXAVRO_TYPE_LONG: { uint64_t val = 0; - maxavro_read_integer(file, &val); - json_int_t jsonint = val; - value = json_pack("I", jsonint); + if (maxavro_read_integer(file, &val)) + { + json_int_t jsonint = val; + value = json_pack("I", jsonint); + } } break; @@ -74,11 +76,23 @@ static json_t* read_and_pack_value(MAXAVRO_FILE *file, MAXAVRO_SCHEMA_FIELD *fie break; case MAXAVRO_TYPE_FLOAT: + { + float f = 0; + if (maxavro_read_float(file, &f)) + { + double d = f; + value = json_pack("f", d); + } + } + + break; case MAXAVRO_TYPE_DOUBLE: { double d = 0; - maxavro_read_double(file, &d); - value = json_pack("f", d); + if (maxavro_read_double(file, &d)) + { + value = json_pack("f", d); + } } break; diff --git a/server/core/mysql_binlog.c b/server/core/mysql_binlog.c index e67934885..8eeaaa02e 100644 --- a/server/core/mysql_binlog.c +++ b/server/core/mysql_binlog.c @@ -290,7 +290,7 @@ static void unpack_datetime2(uint8_t *ptr, uint8_t decimals, struct tm *dest) dest->tm_min = (time >> 6) % (1 << 6); dest->tm_hour = time >> 12; dest->tm_mday = date % (1 << 5); - dest->tm_mon = yearmonth % 13; + dest->tm_mon = (yearmonth % 13) - 1; /** struct tm stores the year as: Year - 1900 */ dest->tm_year = (yearmonth / 13) - 1900; @@ -347,7 +347,7 @@ static void unpack_date(uint8_t *ptr, struct tm *dest) uint64_t val = ptr[0] + (ptr[1] << 8) + (ptr[2] << 16); memset(dest, 0, sizeof(struct tm)); dest->tm_mday = val & 31; - dest->tm_mon = (val >> 5) & 15; + dest->tm_mon = ((val >> 5) & 15) - 1; dest->tm_year = (val >> 9) - 1900; } @@ -560,34 +560,42 @@ static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes) switch (bytes) { - case 1: - val = ptr[0]; - break; - case 2: - val = ptr[1] | ((uint64_t)(ptr[0]) << 8); - break; - case 3: - val = (uint64_t)ptr[2] | ((uint64_t)ptr[1] << 8) | ((uint64_t)ptr[0] << 16); - break; - case 4: - val = (uint64_t)ptr[3] | ((uint64_t)ptr[2] << 8) | ((uint64_t)ptr[1] << 16) | ((uint64_t)ptr[0] << 24); - break; - case 5: - val = (uint64_t)ptr[4] | ((uint64_t)ptr[3] << 8) | ((uint64_t)ptr[2] << 16) | ((uint64_t)ptr[1] << 24) | (( - uint64_t)ptr[0] << 32); - break; - case 6: - val = (uint64_t)ptr[5] | ((uint64_t)ptr[4] << 8) | ((uint64_t)ptr[3] << 16) | ((uint64_t)ptr[2] << 24) | (( - uint64_t)ptr[1] << 32) | ((uint64_t)ptr[0] << 40); - break; - case 7: - val = (uint64_t)ptr[6] | ((uint64_t)ptr[5] << 8) | ((uint64_t)ptr[4] << 16) | ((uint64_t)ptr[3] << 24) | (( - uint64_t)ptr[2] << 32) | ((uint64_t)ptr[1] << 40) | ((uint64_t)ptr[0] << 48); - break; - case 8: - val = (uint64_t)ptr[7] | ((uint64_t)ptr[6] << 8) | ((uint64_t)ptr[5] << 16) | ((uint64_t)ptr[4] << 24) | (( - uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) | ((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56); - break; + case 1: + val = ptr[0]; + break; + case 2: + val = ptr[1] | ((uint64_t)(ptr[0]) << 8); + break; + case 3: + val = (uint64_t)ptr[2] | ((uint64_t)ptr[1] << 8) | + ((uint64_t)ptr[0] << 16); + break; + case 4: + val = (uint64_t)ptr[3] | ((uint64_t)ptr[2] << 8) | + ((uint64_t)ptr[1] << 16) | ((uint64_t)ptr[0] << 24); + break; + case 5: + val = (uint64_t)ptr[4] | ((uint64_t)ptr[3] << 8) | + ((uint64_t)ptr[2] << 16) | ((uint64_t)ptr[1] << 24) | + ((uint64_t)ptr[0] << 32); + break; + case 6: + val = (uint64_t)ptr[5] | ((uint64_t)ptr[4] << 8) | + ((uint64_t)ptr[3] << 16) | ((uint64_t)ptr[2] << 24) | + ((uint64_t)ptr[1] << 32) | ((uint64_t)ptr[0] << 40); + break; + case 7: + val = (uint64_t)ptr[6] | ((uint64_t)ptr[5] << 8) | + ((uint64_t)ptr[4] << 16) | ((uint64_t)ptr[3] << 24) | + ((uint64_t)ptr[2] << 32) | ((uint64_t)ptr[1] << 40) | + ((uint64_t)ptr[0] << 48); + break; + case 8: + val = (uint64_t)ptr[7] | ((uint64_t)ptr[6] << 8) | + ((uint64_t)ptr[5] << 16) | ((uint64_t)ptr[4] << 24) | + ((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) | + ((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56); + break; } return val; @@ -608,12 +616,11 @@ size_t unpack_decimal_field(uint8_t *ptr, uint8_t *metadata, double *val_float) int fbytes = fpart1 * 4 + dig_bytes[fpart2]; /** Remove the sign bit and store it locally */ - bool signed_int = (ptr[0] & 0x80); + bool negative = (ptr[0] & 0x80) == 0; + ptr[0] ^= 0x80; - if (!signed_int) + if (negative) { - ptr[0] |= 0x80; - for (int i = 0; i < ibytes; i++) { ptr[i] = ~ptr[i]; @@ -628,7 +635,7 @@ size_t unpack_decimal_field(uint8_t *ptr, uint8_t *metadata, double *val_float) int64_t val_i = unpack_bytes(ptr, ibytes); int64_t val_f = fbytes ? unpack_bytes(ptr + ibytes, fbytes) : 0; - if (!signed_int) + if (negative) { val_i = -val_i; val_f = -val_f; diff --git a/server/modules/routing/avrorouter/avro.c b/server/modules/routing/avrorouter/avro.c index 435cc8fda..49eda9f41 100644 --- a/server/modules/routing/avrorouter/avro.c +++ b/server/modules/routing/avrorouter/avro.c @@ -991,7 +991,8 @@ extract_message(GWBUF *errpkt) * */ static void -errorReply(MXS_ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_dcb, mxs_error_action_t action, +errorReply(MXS_ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_dcb, + mxs_error_action_t action, bool *succp) { /** We should never end up here */ diff --git a/server/modules/routing/avrorouter/avro_client.c b/server/modules/routing/avrorouter/avro_client.c index 811739d40..9957524db 100644 --- a/server/modules/routing/avrorouter/avro_client.c +++ b/server/modules/routing/avrorouter/avro_client.c @@ -788,25 +788,25 @@ static bool avro_client_stream_data(AVRO_CLIENT *client) { switch (client->format) { - case AVRO_FORMAT_JSON: - /** Currently only JSON format supports seeking to a GTID */ - if (client->requested_gtid && - seek_to_index_pos(client, client->file_handle) && - seek_to_gtid(client, client->file_handle)) - { - client->requested_gtid = false; - } + case AVRO_FORMAT_JSON: + /** Currently only JSON format supports seeking to a GTID */ + if (client->requested_gtid && + seek_to_index_pos(client, client->file_handle) && + seek_to_gtid(client, client->file_handle)) + { + client->requested_gtid = false; + } - read_more = stream_json(client); - break; + read_more = stream_json(client); + break; - case AVRO_FORMAT_AVRO: - read_more = stream_binary(client); - break; + case AVRO_FORMAT_AVRO: + read_more = stream_binary(client); + break; - default: - MXS_ERROR("Unexpected format: %d", client->format); - break; + default: + MXS_ERROR("Unexpected format: %d", client->format); + break; } @@ -847,13 +847,15 @@ GWBUF* read_avro_json_schema(const char *avrofile, const char* dir) if (file) { int nread; - while ((nread = fread(buffer, 1, sizeof(buffer), file)) > 0) + while ((nread = fread(buffer, 1, sizeof(buffer) - 1, file)) > 0) { while (isspace(buffer[nread - 1])) { nread--; } + buffer[nread++] = '\n'; + GWBUF * newbuf = gwbuf_alloc_and_load(nread, buffer); if (newbuf) diff --git a/server/modules/routing/avrorouter/avro_index.c b/server/modules/routing/avrorouter/avro_index.c index b7139ae69..44e0e1b43 100644 --- a/server/modules/routing/avrorouter/avro_index.c +++ b/server/modules/routing/avrorouter/avro_index.c @@ -88,18 +88,22 @@ void avro_index_file(AVRO_INSTANCE *router, const char* filename) snprintf(sql, sizeof(sql), "SELECT position FROM "INDEX_TABLE_NAME " WHERE filename=\"%s\";", name); + if (sqlite3_exec(router->sqlite_handle, sql, index_query_cb, &pos, &errmsg) != SQLITE_OK) { MXS_ERROR("Failed to read last indexed position of file '%s': %s", name, errmsg); + sqlite3_free(errmsg); + maxavro_file_close(file); + return; } - else if (pos > 0) + + /** Continue from last position */ + if (pos > 0 && !maxavro_record_set_pos(file, pos)) { - /** Continue from last position */ - maxavro_record_set_pos(file, pos); + maxavro_file_close(file); + return; } - sqlite3_free(errmsg); - errmsg = NULL; gtid_pos_t prev_gtid = {0, 0, 0, 0, 0}; diff --git a/server/modules/routing/avrorouter/avro_rbr.c b/server/modules/routing/avrorouter/avro_rbr.c index 19b9b4356..ca6534a3b 100644 --- a/server/modules/routing/avrorouter/avro_rbr.c +++ b/server/modules/routing/avrorouter/avro_rbr.c @@ -352,44 +352,64 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) */ void set_numeric_field_value(avro_value_t *field, uint8_t type, uint8_t *metadata, uint8_t *value) { - int64_t i = 0; - switch (type) { case TABLE_COL_TYPE_TINY: - i = *value; - avro_value_set_int(field, i); - break; + { + char c = *value; + avro_value_set_int(field, c); + break; + } case TABLE_COL_TYPE_SHORT: - memcpy(&i, value, 2); - avro_value_set_int(field, i); - break; + { + short s = gw_mysql_get_byte2(value); + avro_value_set_int(field, s); + break; + } case TABLE_COL_TYPE_INT24: - memcpy(&i, value, 3); - avro_value_set_int(field, i); - break; + { + int x = gw_mysql_get_byte3(value); + + if (x & 0x800000) + { + x = -((0xffffff & (~x)) + 1); + } + + avro_value_set_int(field, x); + break; + } case TABLE_COL_TYPE_LONG: - memcpy(&i, value, 4); - avro_value_set_int(field, i); - break; + { + int x = gw_mysql_get_byte4(value); + avro_value_set_int(field, x); + break; + } case TABLE_COL_TYPE_LONGLONG: - memcpy(&i, value, 8); - avro_value_set_int(field, i); - break; + { + long l = gw_mysql_get_byte8(value); + avro_value_set_long(field, l); + break; + } case TABLE_COL_TYPE_FLOAT: - memcpy(&i, value, 4); - avro_value_set_float(field, (float)i); - break; + { + float f = 0; + memcpy(&f, value, 4); + avro_value_set_float(field, f); + break; + } case TABLE_COL_TYPE_DOUBLE: - memcpy(&i, value, 8); - avro_value_set_float(field, (double)i); - break; + { + double d = 0; + memcpy(&d, value, 8); + avro_value_set_double(field, d); + break; + } default: break;