Improve cdc.py error handling
When the CDC service reponds with an error, the program prints the error and exits.
This commit is contained in:
parent
d93edf02e2
commit
c3ffc0ba9c
@ -20,7 +20,10 @@ import selectors
|
||||
import binascii
|
||||
import os
|
||||
|
||||
schema_read = False
|
||||
|
||||
def read_data():
|
||||
global schema_read
|
||||
sel = selectors.DefaultSelector()
|
||||
sel.register(sock, selectors.EVENT_READ)
|
||||
|
||||
@ -29,8 +32,17 @@ def read_data():
|
||||
events = sel.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
|
||||
buf = sock.recv(4096, socket.MSG_DONTWAIT)
|
||||
if len(buf) > 0:
|
||||
# If the request for data is rejected, an error will be sent instead of the table schema
|
||||
if not schema_read:
|
||||
if "err" in buf.decode().lower():
|
||||
print(buf.decode(), file=sys.stderr)
|
||||
exit(1)
|
||||
else:
|
||||
schema_read = True
|
||||
|
||||
os.write(sys.stdout.fileno(), buf)
|
||||
sys.stdout.flush()
|
||||
|
||||
else:
|
||||
raise Exception('Socket was closed')
|
||||
|
||||
@ -40,6 +52,13 @@ def read_data():
|
||||
print(ex, file=sys.stderr)
|
||||
break
|
||||
|
||||
|
||||
def check_for_err(err):
|
||||
if "err" in err.lower().strip():
|
||||
print(err.strip(), file=sys.stderr)
|
||||
exit(1)
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve")
|
||||
parser.add_argument("-h", "--host", dest="host", help="Network address where the connection is made", default="localhost")
|
||||
parser.add_argument("-P", "--port", dest="port", help="Port where the connection is made", default="4001")
|
||||
@ -60,13 +79,17 @@ auth_string += bytes(hashlib.sha1(opts.password.encode("utf_8")).hexdigest().enc
|
||||
sock.send(auth_string)
|
||||
|
||||
# Discard the response
|
||||
response = str(sock.recv(1024)).encode('utf_8')
|
||||
response = sock.recv(1024).decode()
|
||||
|
||||
check_for_err(response)
|
||||
|
||||
# Register as a client as request Avro format data
|
||||
sock.send(bytes(("REGISTER UUID=XXX-YYY_YYY, TYPE=" + opts.format).encode()))
|
||||
|
||||
# Discard the response again
|
||||
response = str(sock.recv(1024)).encode('utf_8')
|
||||
response = sock.recv(1024).decode()
|
||||
|
||||
check_for_err(response)
|
||||
|
||||
# Request a data stream
|
||||
sock.send(bytes(("REQUEST-DATA " + opts.FILE + (" " + opts.GTID if opts.GTID else "")).encode()))
|
||||
|
Loading…
x
Reference in New Issue
Block a user