diff --git a/server/modules/protocol/examples/cdc_kafka_producer.py b/server/modules/protocol/examples/cdc_kafka_producer.py index 4dcc360eb..d8d25d7ed 100755 --- a/server/modules/protocol/examples/cdc_kafka_producer.py +++ b/server/modules/protocol/examples/cdc_kafka_producer.py @@ -16,7 +16,6 @@ # pip install kafka-python # -import json import sys import argparse from kafka import KafkaProducer @@ -30,8 +29,6 @@ parser.add_argument("-T", "--kafka-topic", dest="kafka_topic", default=None, required=True) opts = parser.parse_args(sys.argv[1:]) -decoder = json.JSONDecoder() -rbuf = bytes() producer = KafkaProducer(bootstrap_servers=[opts.kafka_broker]) while True: @@ -41,18 +38,9 @@ while True: if len(buf) == 0: break - rbuf += buf.encode() - - while True: - rbuf = rbuf.lstrip() - data = decoder.raw_decode(rbuf.decode('utf_8')) - rbuf = rbuf[data[1]:] - producer.send(topic=opts.kafka_topic, value=json.dumps(data[0]).encode()) - producer.flush() - - # JSONDecoder will return a ValueError if a partial JSON object is read - except ValueError as err: - pass + data = buf.encode().strip() + producer.send(topic=opts.kafka_topic, value=data) + producer.flush() # All other errors should interrupt the processing except Exception as ex: