MXS-1271: Remove redundant JSON parsing
As the cdc.py script outputs newline delimited JSON, there's no need to parse the JSON in the cdc_kafka_producer.py script.
This commit is contained in:
parent
ddc7b0fb09
commit
a3cfabf37b
@ -16,7 +16,6 @@
|
||||
# pip install kafka-python
|
||||
#
|
||||
|
||||
import json
|
||||
import sys
|
||||
import argparse
|
||||
from kafka import KafkaProducer
|
||||
@ -30,8 +29,6 @@ parser.add_argument("-T", "--kafka-topic", dest="kafka_topic",
|
||||
default=None, required=True)
|
||||
|
||||
opts = parser.parse_args(sys.argv[1:])
|
||||
decoder = json.JSONDecoder()
|
||||
rbuf = bytes()
|
||||
producer = KafkaProducer(bootstrap_servers=[opts.kafka_broker])
|
||||
|
||||
while True:
|
||||
@ -41,18 +38,9 @@ while True:
|
||||
if len(buf) == 0:
|
||||
break
|
||||
|
||||
rbuf += buf.encode()
|
||||
|
||||
while True:
|
||||
rbuf = rbuf.lstrip()
|
||||
data = decoder.raw_decode(rbuf.decode('ascii'))
|
||||
rbuf = rbuf[data[1]:]
|
||||
producer.send(topic=opts.kafka_topic, value=json.dumps(data[0]).encode())
|
||||
producer.flush()
|
||||
|
||||
# JSONDecoder will return a ValueError if a partial JSON object is read
|
||||
except ValueError as err:
|
||||
pass
|
||||
data = buf.encode().strip()
|
||||
producer.send(topic=opts.kafka_topic, value=data)
|
||||
producer.flush()
|
||||
|
||||
# All other errors should interrupt the processing
|
||||
except Exception as ex:
|
||||
|
Loading…
x
Reference in New Issue
Block a user