From a3cfabf37b776be475043f3cfca6d0a886c0f94e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Tue, 30 May 2017 11:49:39 +0300 Subject: [PATCH] MXS-1271: Remove redundant JSON parsing As the cdc.py script outputs newline delimited JSON, there's no need to parse the JSON in the cdc_kafka_producer.py script. --- .../protocol/examples/cdc_kafka_producer.py | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/server/modules/protocol/examples/cdc_kafka_producer.py b/server/modules/protocol/examples/cdc_kafka_producer.py index cddcf1bff..06bc9f577 100755 --- a/server/modules/protocol/examples/cdc_kafka_producer.py +++ b/server/modules/protocol/examples/cdc_kafka_producer.py @@ -16,7 +16,6 @@ # pip install kafka-python # -import json import sys import argparse from kafka import KafkaProducer @@ -30,8 +29,6 @@ parser.add_argument("-T", "--kafka-topic", dest="kafka_topic", default=None, required=True) opts = parser.parse_args(sys.argv[1:]) -decoder = json.JSONDecoder() -rbuf = bytes() producer = KafkaProducer(bootstrap_servers=[opts.kafka_broker]) while True: @@ -41,18 +38,9 @@ while True: if len(buf) == 0: break - rbuf += buf.encode() - - while True: - rbuf = rbuf.lstrip() - data = decoder.raw_decode(rbuf.decode('ascii')) - rbuf = rbuf[data[1]:] - producer.send(topic=opts.kafka_topic, value=json.dumps(data[0]).encode()) - producer.flush() - - # JSONDecoder will return a ValueError if a partial JSON object is read - except ValueError as err: - pass + data = buf.encode().strip() + producer.send(topic=opts.kafka_topic, value=data) + producer.flush() # All other errors should interrupt the processing except Exception as ex: