Improve UTF-8 handling in avrorouter
The json_stringn function should be used instead of the json_string to allow null characters as well as non-null terminated strings to be embedded in the JSON values. The CDC example Python programs now decode the data as UTF-8 instead of ASCII.
This commit is contained in:
@ -103,7 +103,7 @@ uint64_t avro_length_integer(uint64_t val)
|
|||||||
*
|
*
|
||||||
* @see maxavro_get_error
|
* @see maxavro_get_error
|
||||||
*/
|
*/
|
||||||
char* maxavro_read_string(MAXAVRO_FILE* file)
|
char* maxavro_read_string(MAXAVRO_FILE* file, size_t* size)
|
||||||
{
|
{
|
||||||
char *key = NULL;
|
char *key = NULL;
|
||||||
uint64_t len;
|
uint64_t len;
|
||||||
@ -117,6 +117,7 @@ char* maxavro_read_string(MAXAVRO_FILE* file)
|
|||||||
if (nread == len)
|
if (nread == len)
|
||||||
{
|
{
|
||||||
key[len] = '\0';
|
key[len] = '\0';
|
||||||
|
*size = len;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -261,8 +262,9 @@ MAXAVRO_MAP* maxavro_map_read(MAXAVRO_FILE *file)
|
|||||||
{
|
{
|
||||||
for (long i = 0; i < blocks; i++)
|
for (long i = 0; i < blocks; i++)
|
||||||
{
|
{
|
||||||
|
size_t size;
|
||||||
MAXAVRO_MAP* val = calloc(1, sizeof(MAXAVRO_MAP));
|
MAXAVRO_MAP* val = calloc(1, sizeof(MAXAVRO_MAP));
|
||||||
if (val && (val->key = maxavro_read_string(file)) && (val->value = maxavro_read_string(file)))
|
if (val && (val->key = maxavro_read_string(file, &size)) && (val->value = maxavro_read_string(file, &size)))
|
||||||
{
|
{
|
||||||
val->next = rval;
|
val->next = rval;
|
||||||
rval = val;
|
rval = val;
|
||||||
|
@ -139,7 +139,7 @@ bool maxavro_datablock_add_double(MAXAVRO_DATABLOCK *file, double val);
|
|||||||
|
|
||||||
/** Reading primitives */
|
/** Reading primitives */
|
||||||
bool maxavro_read_integer(MAXAVRO_FILE *file, uint64_t *val);
|
bool maxavro_read_integer(MAXAVRO_FILE *file, uint64_t *val);
|
||||||
char* maxavro_read_string(MAXAVRO_FILE *file);
|
char* maxavro_read_string(MAXAVRO_FILE *file, size_t *size);
|
||||||
bool maxavro_skip_string(MAXAVRO_FILE* file);
|
bool maxavro_skip_string(MAXAVRO_FILE* file);
|
||||||
bool maxavro_read_float(MAXAVRO_FILE *file, float *dest);
|
bool maxavro_read_float(MAXAVRO_FILE *file, float *dest);
|
||||||
bool maxavro_read_double(MAXAVRO_FILE *file, double *dest);
|
bool maxavro_read_double(MAXAVRO_FILE *file, double *dest);
|
||||||
|
@ -99,10 +99,11 @@ static json_t* read_and_pack_value(MAXAVRO_FILE *file, MAXAVRO_SCHEMA_FIELD *fie
|
|||||||
case MAXAVRO_TYPE_BYTES:
|
case MAXAVRO_TYPE_BYTES:
|
||||||
case MAXAVRO_TYPE_STRING:
|
case MAXAVRO_TYPE_STRING:
|
||||||
{
|
{
|
||||||
char *str = maxavro_read_string(file);
|
size_t len;
|
||||||
|
char *str = maxavro_read_string(file, &len);
|
||||||
if (str)
|
if (str)
|
||||||
{
|
{
|
||||||
value = json_string(str);
|
value = json_stringn(str, len);
|
||||||
free(str);
|
free(str);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -37,7 +37,7 @@ def read_json():
|
|||||||
rbuf += buf
|
rbuf += buf
|
||||||
while True:
|
while True:
|
||||||
rbuf = rbuf.lstrip()
|
rbuf = rbuf.lstrip()
|
||||||
data = decoder.raw_decode(rbuf.decode('ascii'))
|
data = decoder.raw_decode(rbuf.decode('utf_8'))
|
||||||
rbuf = rbuf[data[1]:]
|
rbuf = rbuf[data[1]:]
|
||||||
print(json.dumps(data[0]))
|
print(json.dumps(data[0]))
|
||||||
except ValueError as err:
|
except ValueError as err:
|
||||||
|
@ -45,7 +45,7 @@ while True:
|
|||||||
|
|
||||||
while True:
|
while True:
|
||||||
rbuf = rbuf.lstrip()
|
rbuf = rbuf.lstrip()
|
||||||
data = decoder.raw_decode(rbuf.decode('ascii'))
|
data = decoder.raw_decode(rbuf.decode('utf_8'))
|
||||||
rbuf = rbuf[data[1]:]
|
rbuf = rbuf[data[1]:]
|
||||||
producer.send(topic=opts.kafka_topic, value=json.dumps(data[0]).encode())
|
producer.send(topic=opts.kafka_topic, value=json.dumps(data[0]).encode())
|
||||||
producer.flush()
|
producer.flush()
|
||||||
|
@ -52,4 +52,4 @@ else:
|
|||||||
sock.send(bytes("QUERY-LAST-TRANSACTION".encode()))
|
sock.send(bytes("QUERY-LAST-TRANSACTION".encode()))
|
||||||
|
|
||||||
response = sock.recv(1024)
|
response = sock.recv(1024)
|
||||||
print(response.decode('ascii'))
|
print(response.decode('utf_8'))
|
||||||
|
Reference in New Issue
Block a user