MXS-1271: Remove redundant JSON parsing
As the cdc.py script outputs newline delimited JSON, there's no need to parse the JSON in the cdc_kafka_producer.py script.
This commit is contained in:
@ -16,7 +16,6 @@
|
|||||||
# pip install kafka-python
|
# pip install kafka-python
|
||||||
#
|
#
|
||||||
|
|
||||||
import json
|
|
||||||
import sys
|
import sys
|
||||||
import argparse
|
import argparse
|
||||||
from kafka import KafkaProducer
|
from kafka import KafkaProducer
|
||||||
@ -30,8 +29,6 @@ parser.add_argument("-T", "--kafka-topic", dest="kafka_topic",
|
|||||||
default=None, required=True)
|
default=None, required=True)
|
||||||
|
|
||||||
opts = parser.parse_args(sys.argv[1:])
|
opts = parser.parse_args(sys.argv[1:])
|
||||||
decoder = json.JSONDecoder()
|
|
||||||
rbuf = bytes()
|
|
||||||
producer = KafkaProducer(bootstrap_servers=[opts.kafka_broker])
|
producer = KafkaProducer(bootstrap_servers=[opts.kafka_broker])
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
@ -41,18 +38,9 @@ while True:
|
|||||||
if len(buf) == 0:
|
if len(buf) == 0:
|
||||||
break
|
break
|
||||||
|
|
||||||
rbuf += buf.encode()
|
data = buf.encode().strip()
|
||||||
|
producer.send(topic=opts.kafka_topic, value=data)
|
||||||
while True:
|
producer.flush()
|
||||||
rbuf = rbuf.lstrip()
|
|
||||||
data = decoder.raw_decode(rbuf.decode('utf_8'))
|
|
||||||
rbuf = rbuf[data[1]:]
|
|
||||||
producer.send(topic=opts.kafka_topic, value=json.dumps(data[0]).encode())
|
|
||||||
producer.flush()
|
|
||||||
|
|
||||||
# JSONDecoder will return a ValueError if a partial JSON object is read
|
|
||||||
except ValueError as err:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# All other errors should interrupt the processing
|
# All other errors should interrupt the processing
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
|
Reference in New Issue
Block a user