From 7d7d8a0560023f84bcc292d0b74b98a00fb5c910 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 17 Mar 2017 07:00:58 +0200 Subject: [PATCH] Print all output as raw bytes Printing all output as raw bytes allows MaxScale to control the formatting process. This also removes the need to convert the bytes to Python strings and the need to parse the JSON. --- server/modules/protocol/examples/cdc.py | 51 +++++++------------------ 1 file changed, 14 insertions(+), 37 deletions(-) diff --git a/server/modules/protocol/examples/cdc.py b/server/modules/protocol/examples/cdc.py index 7929b9cb2..e44664267 100755 --- a/server/modules/protocol/examples/cdc.py +++ b/server/modules/protocol/examples/cdc.py @@ -12,52 +12,32 @@ # Public License. import time -import json -import re import sys import socket import hashlib import argparse -import subprocess import selectors import binascii import os -# Read data as JSON -def read_json(): - decoder = json.JSONDecoder() - rbuf = bytes() - ep = selectors.EpollSelector() - ep.register(sock, selectors.EVENT_READ) +def read_data(): + sel = selectors.DefaultSelector() + sel.register(sock, selectors.EVENT_READ) while True: - pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None) try: + events = sel.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None) buf = sock.recv(4096, socket.MSG_DONTWAIT) - rbuf += buf - while True: - rbuf = rbuf.lstrip() - data = decoder.raw_decode(rbuf.decode('ascii')) - rbuf = rbuf[data[1]:] - print(json.dumps(data[0])) - except ValueError as err: - sys.stdout.flush() - pass - except Exception: + if len(buf) > 0: + os.write(sys.stdout.fileno(), buf) + sys.stdout.flush() + else: + raise Exception('Socket was closed') + + except BlockingIOError: break - -# Read data as Avro -def read_avro(): - ep = selectors.EpollSelector() - ep.register(sock, selectors.EVENT_READ) - - while True: - pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None) - try: - buf = sock.recv(4096, socket.MSG_DONTWAIT) - os.write(sys.stdout.fileno(), buf) - sys.stdout.flush() - except Exception: + except Exception as ex: + print(ex, file=sys.stderr) break parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve") @@ -91,7 +71,4 @@ response = str(sock.recv(1024)).encode('utf_8') # Request a data stream sock.send(bytes(("REQUEST-DATA " + opts.FILE + (" " + opts.GTID if opts.GTID else "")).encode())) -if opts.format == "JSON": - read_json() -elif opts.format == "AVRO": - read_avro() +read_data()