Merge branch '2.0' into 2.1

This commit is contained in:
Markus Mäkelä
2017-02-20 11:17:49 +02:00
7 changed files with 139 additions and 89 deletions

View File

@ -12,6 +12,7 @@
*/ */
#include "maxavro.h" #include "maxavro.h"
#include "skygw_utils.h"
#include <errno.h> #include <errno.h>
#include <string.h> #include <string.h>
#include <maxscale/log_manager.h> #include <maxscale/log_manager.h>
@ -49,11 +50,12 @@ bool maxavro_verify_block(MAXAVRO_FILE *file)
int rc = fread(sync, 1, SYNC_MARKER_SIZE, file->file); int rc = fread(sync, 1, SYNC_MARKER_SIZE, file->file);
if (rc != SYNC_MARKER_SIZE) if (rc != SYNC_MARKER_SIZE)
{ {
if (rc == -1) if (ferror(file->file))
{ {
MXS_ERROR("Failed to read file: %d %s", errno, strerror(errno)); char err[STRERROR_BUFLEN];
MXS_ERROR("Failed to read file: %d %s", errno, strerror_r(errno, err, sizeof(err)));
} }
else else if (rc > 0 || !feof(file->file))
{ {
MXS_ERROR("Short read when reading sync marker. Read %d bytes instead of %d", MXS_ERROR("Short read when reading sync marker. Read %d bytes instead of %d",
rc, SYNC_MARKER_SIZE); rc, SYNC_MARKER_SIZE);

View File

@ -49,9 +49,11 @@ static json_t* read_and_pack_value(MAXAVRO_FILE *file, MAXAVRO_SCHEMA_FIELD *fie
case MAXAVRO_TYPE_LONG: case MAXAVRO_TYPE_LONG:
{ {
uint64_t val = 0; uint64_t val = 0;
maxavro_read_integer(file, &val); if (maxavro_read_integer(file, &val))
json_int_t jsonint = val; {
value = json_pack("I", jsonint); json_int_t jsonint = val;
value = json_pack("I", jsonint);
}
} }
break; break;
@ -74,11 +76,23 @@ static json_t* read_and_pack_value(MAXAVRO_FILE *file, MAXAVRO_SCHEMA_FIELD *fie
break; break;
case MAXAVRO_TYPE_FLOAT: case MAXAVRO_TYPE_FLOAT:
{
float f = 0;
if (maxavro_read_float(file, &f))
{
double d = f;
value = json_pack("f", d);
}
}
break;
case MAXAVRO_TYPE_DOUBLE: case MAXAVRO_TYPE_DOUBLE:
{ {
double d = 0; double d = 0;
maxavro_read_double(file, &d); if (maxavro_read_double(file, &d))
value = json_pack("f", d); {
value = json_pack("f", d);
}
} }
break; break;

View File

@ -290,7 +290,7 @@ static void unpack_datetime2(uint8_t *ptr, uint8_t decimals, struct tm *dest)
dest->tm_min = (time >> 6) % (1 << 6); dest->tm_min = (time >> 6) % (1 << 6);
dest->tm_hour = time >> 12; dest->tm_hour = time >> 12;
dest->tm_mday = date % (1 << 5); dest->tm_mday = date % (1 << 5);
dest->tm_mon = yearmonth % 13; dest->tm_mon = (yearmonth % 13) - 1;
/** struct tm stores the year as: Year - 1900 */ /** struct tm stores the year as: Year - 1900 */
dest->tm_year = (yearmonth / 13) - 1900; dest->tm_year = (yearmonth / 13) - 1900;
@ -347,7 +347,7 @@ static void unpack_date(uint8_t *ptr, struct tm *dest)
uint64_t val = ptr[0] + (ptr[1] << 8) + (ptr[2] << 16); uint64_t val = ptr[0] + (ptr[1] << 8) + (ptr[2] << 16);
memset(dest, 0, sizeof(struct tm)); memset(dest, 0, sizeof(struct tm));
dest->tm_mday = val & 31; dest->tm_mday = val & 31;
dest->tm_mon = (val >> 5) & 15; dest->tm_mon = ((val >> 5) & 15) - 1;
dest->tm_year = (val >> 9) - 1900; dest->tm_year = (val >> 9) - 1900;
} }
@ -560,34 +560,42 @@ static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes)
switch (bytes) switch (bytes)
{ {
case 1: case 1:
val = ptr[0]; val = ptr[0];
break; break;
case 2: case 2:
val = ptr[1] | ((uint64_t)(ptr[0]) << 8); val = ptr[1] | ((uint64_t)(ptr[0]) << 8);
break; break;
case 3: case 3:
val = (uint64_t)ptr[2] | ((uint64_t)ptr[1] << 8) | ((uint64_t)ptr[0] << 16); val = (uint64_t)ptr[2] | ((uint64_t)ptr[1] << 8) |
break; ((uint64_t)ptr[0] << 16);
case 4: break;
val = (uint64_t)ptr[3] | ((uint64_t)ptr[2] << 8) | ((uint64_t)ptr[1] << 16) | ((uint64_t)ptr[0] << 24); case 4:
break; val = (uint64_t)ptr[3] | ((uint64_t)ptr[2] << 8) |
case 5: ((uint64_t)ptr[1] << 16) | ((uint64_t)ptr[0] << 24);
val = (uint64_t)ptr[4] | ((uint64_t)ptr[3] << 8) | ((uint64_t)ptr[2] << 16) | ((uint64_t)ptr[1] << 24) | (( break;
uint64_t)ptr[0] << 32); case 5:
break; val = (uint64_t)ptr[4] | ((uint64_t)ptr[3] << 8) |
case 6: ((uint64_t)ptr[2] << 16) | ((uint64_t)ptr[1] << 24) |
val = (uint64_t)ptr[5] | ((uint64_t)ptr[4] << 8) | ((uint64_t)ptr[3] << 16) | ((uint64_t)ptr[2] << 24) | (( ((uint64_t)ptr[0] << 32);
uint64_t)ptr[1] << 32) | ((uint64_t)ptr[0] << 40); break;
break; case 6:
case 7: val = (uint64_t)ptr[5] | ((uint64_t)ptr[4] << 8) |
val = (uint64_t)ptr[6] | ((uint64_t)ptr[5] << 8) | ((uint64_t)ptr[4] << 16) | ((uint64_t)ptr[3] << 24) | (( ((uint64_t)ptr[3] << 16) | ((uint64_t)ptr[2] << 24) |
uint64_t)ptr[2] << 32) | ((uint64_t)ptr[1] << 40) | ((uint64_t)ptr[0] << 48); ((uint64_t)ptr[1] << 32) | ((uint64_t)ptr[0] << 40);
break; break;
case 8: case 7:
val = (uint64_t)ptr[7] | ((uint64_t)ptr[6] << 8) | ((uint64_t)ptr[5] << 16) | ((uint64_t)ptr[4] << 24) | (( val = (uint64_t)ptr[6] | ((uint64_t)ptr[5] << 8) |
uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) | ((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56); ((uint64_t)ptr[4] << 16) | ((uint64_t)ptr[3] << 24) |
break; ((uint64_t)ptr[2] << 32) | ((uint64_t)ptr[1] << 40) |
((uint64_t)ptr[0] << 48);
break;
case 8:
val = (uint64_t)ptr[7] | ((uint64_t)ptr[6] << 8) |
((uint64_t)ptr[5] << 16) | ((uint64_t)ptr[4] << 24) |
((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) |
((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56);
break;
} }
return val; return val;
@ -608,12 +616,11 @@ size_t unpack_decimal_field(uint8_t *ptr, uint8_t *metadata, double *val_float)
int fbytes = fpart1 * 4 + dig_bytes[fpart2]; int fbytes = fpart1 * 4 + dig_bytes[fpart2];
/** Remove the sign bit and store it locally */ /** Remove the sign bit and store it locally */
bool signed_int = (ptr[0] & 0x80); bool negative = (ptr[0] & 0x80) == 0;
ptr[0] ^= 0x80;
if (!signed_int) if (negative)
{ {
ptr[0] |= 0x80;
for (int i = 0; i < ibytes; i++) for (int i = 0; i < ibytes; i++)
{ {
ptr[i] = ~ptr[i]; ptr[i] = ~ptr[i];
@ -628,7 +635,7 @@ size_t unpack_decimal_field(uint8_t *ptr, uint8_t *metadata, double *val_float)
int64_t val_i = unpack_bytes(ptr, ibytes); int64_t val_i = unpack_bytes(ptr, ibytes);
int64_t val_f = fbytes ? unpack_bytes(ptr + ibytes, fbytes) : 0; int64_t val_f = fbytes ? unpack_bytes(ptr + ibytes, fbytes) : 0;
if (!signed_int) if (negative)
{ {
val_i = -val_i; val_i = -val_i;
val_f = -val_f; val_f = -val_f;

View File

@ -991,7 +991,8 @@ extract_message(GWBUF *errpkt)
* *
*/ */
static void static void
errorReply(MXS_ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_dcb, mxs_error_action_t action, errorReply(MXS_ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_dcb,
mxs_error_action_t action,
bool *succp) bool *succp)
{ {
/** We should never end up here */ /** We should never end up here */

View File

@ -788,25 +788,25 @@ static bool avro_client_stream_data(AVRO_CLIENT *client)
{ {
switch (client->format) switch (client->format)
{ {
case AVRO_FORMAT_JSON: case AVRO_FORMAT_JSON:
/** Currently only JSON format supports seeking to a GTID */ /** Currently only JSON format supports seeking to a GTID */
if (client->requested_gtid && if (client->requested_gtid &&
seek_to_index_pos(client, client->file_handle) && seek_to_index_pos(client, client->file_handle) &&
seek_to_gtid(client, client->file_handle)) seek_to_gtid(client, client->file_handle))
{ {
client->requested_gtid = false; client->requested_gtid = false;
} }
read_more = stream_json(client); read_more = stream_json(client);
break; break;
case AVRO_FORMAT_AVRO: case AVRO_FORMAT_AVRO:
read_more = stream_binary(client); read_more = stream_binary(client);
break; break;
default: default:
MXS_ERROR("Unexpected format: %d", client->format); MXS_ERROR("Unexpected format: %d", client->format);
break; break;
} }
@ -847,13 +847,15 @@ GWBUF* read_avro_json_schema(const char *avrofile, const char* dir)
if (file) if (file)
{ {
int nread; int nread;
while ((nread = fread(buffer, 1, sizeof(buffer), file)) > 0) while ((nread = fread(buffer, 1, sizeof(buffer) - 1, file)) > 0)
{ {
while (isspace(buffer[nread - 1])) while (isspace(buffer[nread - 1]))
{ {
nread--; nread--;
} }
buffer[nread++] = '\n';
GWBUF * newbuf = gwbuf_alloc_and_load(nread, buffer); GWBUF * newbuf = gwbuf_alloc_and_load(nread, buffer);
if (newbuf) if (newbuf)

View File

@ -88,18 +88,22 @@ void avro_index_file(AVRO_INSTANCE *router, const char* filename)
snprintf(sql, sizeof(sql), "SELECT position FROM "INDEX_TABLE_NAME snprintf(sql, sizeof(sql), "SELECT position FROM "INDEX_TABLE_NAME
" WHERE filename=\"%s\";", name); " WHERE filename=\"%s\";", name);
if (sqlite3_exec(router->sqlite_handle, sql, index_query_cb, &pos, &errmsg) != SQLITE_OK) if (sqlite3_exec(router->sqlite_handle, sql, index_query_cb, &pos, &errmsg) != SQLITE_OK)
{ {
MXS_ERROR("Failed to read last indexed position of file '%s': %s", MXS_ERROR("Failed to read last indexed position of file '%s': %s",
name, errmsg); name, errmsg);
sqlite3_free(errmsg);
maxavro_file_close(file);
return;
} }
else if (pos > 0)
/** Continue from last position */
if (pos > 0 && !maxavro_record_set_pos(file, pos))
{ {
/** Continue from last position */ maxavro_file_close(file);
maxavro_record_set_pos(file, pos); return;
} }
sqlite3_free(errmsg);
errmsg = NULL;
gtid_pos_t prev_gtid = {0, 0, 0, 0, 0}; gtid_pos_t prev_gtid = {0, 0, 0, 0, 0};

View File

@ -352,44 +352,64 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
*/ */
void set_numeric_field_value(avro_value_t *field, uint8_t type, uint8_t *metadata, uint8_t *value) void set_numeric_field_value(avro_value_t *field, uint8_t type, uint8_t *metadata, uint8_t *value)
{ {
int64_t i = 0;
switch (type) switch (type)
{ {
case TABLE_COL_TYPE_TINY: case TABLE_COL_TYPE_TINY:
i = *value; {
avro_value_set_int(field, i); char c = *value;
break; avro_value_set_int(field, c);
break;
}
case TABLE_COL_TYPE_SHORT: case TABLE_COL_TYPE_SHORT:
memcpy(&i, value, 2); {
avro_value_set_int(field, i); short s = gw_mysql_get_byte2(value);
break; avro_value_set_int(field, s);
break;
}
case TABLE_COL_TYPE_INT24: case TABLE_COL_TYPE_INT24:
memcpy(&i, value, 3); {
avro_value_set_int(field, i); int x = gw_mysql_get_byte3(value);
break;
if (x & 0x800000)
{
x = -((0xffffff & (~x)) + 1);
}
avro_value_set_int(field, x);
break;
}
case TABLE_COL_TYPE_LONG: case TABLE_COL_TYPE_LONG:
memcpy(&i, value, 4); {
avro_value_set_int(field, i); int x = gw_mysql_get_byte4(value);
break; avro_value_set_int(field, x);
break;
}
case TABLE_COL_TYPE_LONGLONG: case TABLE_COL_TYPE_LONGLONG:
memcpy(&i, value, 8); {
avro_value_set_int(field, i); long l = gw_mysql_get_byte8(value);
break; avro_value_set_long(field, l);
break;
}
case TABLE_COL_TYPE_FLOAT: case TABLE_COL_TYPE_FLOAT:
memcpy(&i, value, 4); {
avro_value_set_float(field, (float)i); float f = 0;
break; memcpy(&f, value, 4);
avro_value_set_float(field, f);
break;
}
case TABLE_COL_TYPE_DOUBLE: case TABLE_COL_TYPE_DOUBLE:
memcpy(&i, value, 8); {
avro_value_set_float(field, (double)i); double d = 0;
break; memcpy(&d, value, 8);
avro_value_set_double(field, d);
break;
}
default: default:
break; break;