Print all output as raw bytes
Printing all output as raw bytes allows MaxScale to control the formatting process. This also removes the need to convert the bytes to Python strings and the need to parse the JSON.
This commit is contained in:
@ -12,52 +12,32 @@
|
|||||||
# Public License.
|
# Public License.
|
||||||
|
|
||||||
import time
|
import time
|
||||||
import json
|
|
||||||
import re
|
|
||||||
import sys
|
import sys
|
||||||
import socket
|
import socket
|
||||||
import hashlib
|
import hashlib
|
||||||
import argparse
|
import argparse
|
||||||
import subprocess
|
|
||||||
import selectors
|
import selectors
|
||||||
import binascii
|
import binascii
|
||||||
import os
|
import os
|
||||||
|
|
||||||
# Read data as JSON
|
def read_data():
|
||||||
def read_json():
|
sel = selectors.DefaultSelector()
|
||||||
decoder = json.JSONDecoder()
|
sel.register(sock, selectors.EVENT_READ)
|
||||||
rbuf = bytes()
|
|
||||||
ep = selectors.EpollSelector()
|
|
||||||
ep.register(sock, selectors.EVENT_READ)
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
|
|
||||||
try:
|
try:
|
||||||
|
events = sel.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
|
||||||
buf = sock.recv(4096, socket.MSG_DONTWAIT)
|
buf = sock.recv(4096, socket.MSG_DONTWAIT)
|
||||||
rbuf += buf
|
if len(buf) > 0:
|
||||||
while True:
|
os.write(sys.stdout.fileno(), buf)
|
||||||
rbuf = rbuf.lstrip()
|
sys.stdout.flush()
|
||||||
data = decoder.raw_decode(rbuf.decode('ascii'))
|
else:
|
||||||
rbuf = rbuf[data[1]:]
|
raise Exception('Socket was closed')
|
||||||
print(json.dumps(data[0]))
|
|
||||||
except ValueError as err:
|
except BlockingIOError:
|
||||||
sys.stdout.flush()
|
|
||||||
pass
|
|
||||||
except Exception:
|
|
||||||
break
|
break
|
||||||
|
except Exception as ex:
|
||||||
# Read data as Avro
|
print(ex, file=sys.stderr)
|
||||||
def read_avro():
|
|
||||||
ep = selectors.EpollSelector()
|
|
||||||
ep.register(sock, selectors.EVENT_READ)
|
|
||||||
|
|
||||||
while True:
|
|
||||||
pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
|
|
||||||
try:
|
|
||||||
buf = sock.recv(4096, socket.MSG_DONTWAIT)
|
|
||||||
os.write(sys.stdout.fileno(), buf)
|
|
||||||
sys.stdout.flush()
|
|
||||||
except Exception:
|
|
||||||
break
|
break
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve")
|
parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve")
|
||||||
@ -91,7 +71,4 @@ response = str(sock.recv(1024)).encode('utf_8')
|
|||||||
# Request a data stream
|
# Request a data stream
|
||||||
sock.send(bytes(("REQUEST-DATA " + opts.FILE + (" " + opts.GTID if opts.GTID else "")).encode()))
|
sock.send(bytes(("REQUEST-DATA " + opts.FILE + (" " + opts.GTID if opts.GTID else "")).encode()))
|
||||||
|
|
||||||
if opts.format == "JSON":
|
read_data()
|
||||||
read_json()
|
|
||||||
elif opts.format == "AVRO":
|
|
||||||
read_avro()
|
|
||||||
|
Reference in New Issue
Block a user