diff --git a/server/modules/protocol/examples/cdc.py b/server/modules/protocol/examples/cdc.py index 7929b9cb2..e44664267 100755 --- a/server/modules/protocol/examples/cdc.py +++ b/server/modules/protocol/examples/cdc.py @@ -12,52 +12,32 @@ # Public License. import time -import json -import re import sys import socket import hashlib import argparse -import subprocess import selectors import binascii import os -# Read data as JSON -def read_json(): - decoder = json.JSONDecoder() - rbuf = bytes() - ep = selectors.EpollSelector() - ep.register(sock, selectors.EVENT_READ) +def read_data(): + sel = selectors.DefaultSelector() + sel.register(sock, selectors.EVENT_READ) while True: - pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None) try: + events = sel.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None) buf = sock.recv(4096, socket.MSG_DONTWAIT) - rbuf += buf - while True: - rbuf = rbuf.lstrip() - data = decoder.raw_decode(rbuf.decode('ascii')) - rbuf = rbuf[data[1]:] - print(json.dumps(data[0])) - except ValueError as err: - sys.stdout.flush() - pass - except Exception: + if len(buf) > 0: + os.write(sys.stdout.fileno(), buf) + sys.stdout.flush() + else: + raise Exception('Socket was closed') + + except BlockingIOError: break - -# Read data as Avro -def read_avro(): - ep = selectors.EpollSelector() - ep.register(sock, selectors.EVENT_READ) - - while True: - pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None) - try: - buf = sock.recv(4096, socket.MSG_DONTWAIT) - os.write(sys.stdout.fileno(), buf) - sys.stdout.flush() - except Exception: + except Exception as ex: + print(ex, file=sys.stderr) break parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve") @@ -91,7 +71,4 @@ response = str(sock.recv(1024)).encode('utf_8') # Request a data stream sock.send(bytes(("REQUEST-DATA " + opts.FILE + (" " + opts.GTID if opts.GTID else "")).encode())) -if opts.format == "JSON": - read_json() -elif opts.format == "AVRO": - read_avro() +read_data()