Merge branch '2.0' into 2.1
This commit is contained in:
@ -20,7 +20,10 @@ import selectors
|
||||
import binascii
|
||||
import os
|
||||
|
||||
schema_read = False
|
||||
|
||||
def read_data():
|
||||
global schema_read
|
||||
sel = selectors.DefaultSelector()
|
||||
sel.register(sock, selectors.EVENT_READ)
|
||||
|
||||
@ -29,8 +32,17 @@ def read_data():
|
||||
events = sel.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
|
||||
buf = sock.recv(4096, socket.MSG_DONTWAIT)
|
||||
if len(buf) > 0:
|
||||
# If the request for data is rejected, an error will be sent instead of the table schema
|
||||
if not schema_read:
|
||||
if "err" in buf.decode().lower():
|
||||
print(buf.decode(), file=sys.stderr)
|
||||
exit(1)
|
||||
else:
|
||||
schema_read = True
|
||||
|
||||
os.write(sys.stdout.fileno(), buf)
|
||||
sys.stdout.flush()
|
||||
|
||||
else:
|
||||
raise Exception('Socket was closed')
|
||||
|
||||
@ -40,6 +52,13 @@ def read_data():
|
||||
print(ex, file=sys.stderr)
|
||||
break
|
||||
|
||||
|
||||
def check_for_err(err):
|
||||
if "err" in err.lower().strip():
|
||||
print(err.strip(), file=sys.stderr)
|
||||
exit(1)
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve")
|
||||
parser.add_argument("-h", "--host", dest="host", help="Network address where the connection is made", default="localhost")
|
||||
parser.add_argument("-P", "--port", dest="port", help="Port where the connection is made", default="4001")
|
||||
@ -60,13 +79,17 @@ auth_string += bytes(hashlib.sha1(opts.password.encode("utf_8")).hexdigest().enc
|
||||
sock.send(auth_string)
|
||||
|
||||
# Discard the response
|
||||
response = str(sock.recv(1024)).encode('utf_8')
|
||||
response = sock.recv(1024).decode()
|
||||
|
||||
check_for_err(response)
|
||||
|
||||
# Register as a client as request Avro format data
|
||||
sock.send(bytes(("REGISTER UUID=XXX-YYY_YYY, TYPE=" + opts.format).encode()))
|
||||
|
||||
# Discard the response again
|
||||
response = str(sock.recv(1024)).encode('utf_8')
|
||||
response = sock.recv(1024).decode()
|
||||
|
||||
check_for_err(response)
|
||||
|
||||
# Request a data stream
|
||||
sock.send(bytes(("REQUEST-DATA " + opts.FILE + (" " + opts.GTID if opts.GTID else "")).encode()))
|
||||
|
@ -575,9 +575,12 @@ int extract_type_length(const char* ptr, char *dest)
|
||||
}
|
||||
|
||||
/** Store type */
|
||||
int typelen = ptr - start;
|
||||
memcpy(dest, start, typelen);
|
||||
dest[typelen] = '\0';
|
||||
for (const char* c = start; c < ptr; c++)
|
||||
{
|
||||
*dest++ = tolower(*c);
|
||||
}
|
||||
|
||||
*dest++ = '\0';
|
||||
|
||||
/** Skip whitespace */
|
||||
while (*ptr && isspace(*ptr))
|
||||
@ -880,7 +883,7 @@ void read_alter_identifier(const char *sql, const char *end, char *dest, int siz
|
||||
|
||||
void make_avro_token(char* dest, const char* src, int length)
|
||||
{
|
||||
while (*src == '(' || *src == ')' || *src == '`' || isspace(*src))
|
||||
while (length > 0 && (*src == '(' || *src == ')' || *src == '`' || isspace(*src)))
|
||||
{
|
||||
src++;
|
||||
length--;
|
||||
@ -902,16 +905,17 @@ void make_avro_token(char* dest, const char* src, int length)
|
||||
fix_reserved_word(dest);
|
||||
}
|
||||
|
||||
int get_column_index(TABLE_CREATE *create, const char *tok)
|
||||
int get_column_index(TABLE_CREATE *create, const char *tok, int len)
|
||||
{
|
||||
int idx = -1;
|
||||
char safe_tok[strlen(tok) + 2];
|
||||
strcpy(safe_tok, tok);
|
||||
char safe_tok[len + 2];
|
||||
memcpy(safe_tok, tok, len);
|
||||
safe_tok[len] = '\0';
|
||||
fix_reserved_word(safe_tok);
|
||||
|
||||
for (int x = 0; x < create->columns; x++)
|
||||
{
|
||||
if (strcasecmp(create->column_names[x], tok) == 0)
|
||||
if (strcasecmp(create->column_names[x], safe_tok) == 0)
|
||||
{
|
||||
idx = x;
|
||||
break;
|
||||
@ -950,18 +954,17 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
|
||||
{
|
||||
tok = get_tok(tok + len, &len, end);
|
||||
|
||||
char ** tmp = MXS_REALLOC(create->column_names, sizeof(char*) * create->columns + 1);
|
||||
ss_dassert(tmp);
|
||||
create->column_names = MXS_REALLOC(create->column_names, sizeof(char*) * create->columns + 1);
|
||||
create->column_types = MXS_REALLOC(create->column_types, sizeof(char*) * create->columns + 1);
|
||||
create->column_lengths = MXS_REALLOC(create->column_lengths, sizeof(int) * create->columns + 1);
|
||||
|
||||
if (tmp == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
create->column_names = tmp;
|
||||
char avro_token[len + 1];
|
||||
make_avro_token(avro_token, tok, len);
|
||||
char field_type[200] = ""; // Enough to hold all types
|
||||
int field_length = extract_type_length(tok + len, field_type);
|
||||
create->column_names[create->columns] = MXS_STRDUP_A(avro_token);
|
||||
create->column_types[create->columns] = MXS_STRDUP_A(field_type);
|
||||
create->column_lengths[create->columns] = field_length;
|
||||
create->columns++;
|
||||
updates++;
|
||||
tok = get_next_def(tok, end);
|
||||
@ -971,25 +974,22 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
|
||||
{
|
||||
tok = get_tok(tok + len, &len, end);
|
||||
|
||||
int idx = get_column_index(create, tok);
|
||||
int idx = get_column_index(create, tok, len);
|
||||
|
||||
if (idx != -1)
|
||||
{
|
||||
MXS_FREE(create->column_names[idx]);
|
||||
MXS_FREE(create->column_types[idx]);
|
||||
for (int i = idx; i < (int)create->columns - 1; i++)
|
||||
{
|
||||
create->column_names[i] = create->column_names[i + 1];
|
||||
create->column_types[i] = create->column_types[i + 1];
|
||||
create->column_lengths[i] = create->column_lengths[i + 1];
|
||||
}
|
||||
|
||||
char ** tmp = realloc(create->column_names, sizeof(char*) * create->columns - 1);
|
||||
ss_dassert(tmp);
|
||||
|
||||
if (tmp == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
create->column_names = tmp;
|
||||
create->column_names = MXS_REALLOC(create->column_names, sizeof(char*) * create->columns - 1);
|
||||
create->column_types = MXS_REALLOC(create->column_types, sizeof(char*) * create->columns - 1);
|
||||
create->column_lengths = MXS_REALLOC(create->column_lengths, sizeof(int) * create->columns - 1);
|
||||
create->columns--;
|
||||
updates++;
|
||||
}
|
||||
@ -1001,12 +1001,19 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
|
||||
{
|
||||
tok = get_tok(tok + len, &len, end);
|
||||
|
||||
int idx = get_column_index(create, tok);
|
||||
int idx = get_column_index(create, tok, len);
|
||||
|
||||
if (idx != -1)
|
||||
if (idx != -1 && (tok = get_tok(tok + len, &len, end)))
|
||||
{
|
||||
MXS_FREE(create->column_names[idx]);
|
||||
create->column_names[idx] = strndup(tok, len);
|
||||
MXS_FREE(create->column_types[idx]);
|
||||
char avro_token[len + 1];
|
||||
make_avro_token(avro_token, tok, len);
|
||||
char field_type[200] = ""; // Enough to hold all types
|
||||
int field_length = extract_type_length(tok + len, field_type);
|
||||
create->column_names[idx] = MXS_STRDUP_A(avro_token);
|
||||
create->column_types[idx] = MXS_STRDUP_A(field_type);
|
||||
create->column_lengths[idx] = field_length;
|
||||
updates++;
|
||||
}
|
||||
|
||||
@ -1021,7 +1028,7 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
|
||||
}
|
||||
|
||||
/** Only increment the create version if it has an associated .avro
|
||||
* file. The .avro file is only created if it is acutally used. */
|
||||
* file. The .avro file is only created if it is actually used. */
|
||||
if (updates > 0 && create->was_used)
|
||||
{
|
||||
create->version++;
|
||||
|
Reference in New Issue
Block a user