Flush the producer after every new record

As the cdc_kafka_producer script is an example, it should flush the
producer after every new record. This should make it easier to see that
events from MaxScale are sent to Kafka.
This commit is contained in:
Markus Mäkelä 2017-01-30 20:14:27 +02:00
parent 4bbd513b1e
commit 6cd16d26b8

View File

@ -48,6 +48,7 @@ while True:
data = decoder.raw_decode(rbuf.decode('ascii'))
rbuf = rbuf[data[1]:]
producer.send(topic=opts.kafka_topic, value=json.dumps(data[0]).encode())
producer.flush()
# JSONDecoder will return a ValueError if a partial JSON object is read
except ValueError as err:
@ -57,5 +58,3 @@ while True:
except Exception as ex:
print(ex)
break
producer.flush()