Merge branch '2.0' into develop

This commit is contained in:
Johan Wikman
2016-09-09 15:12:58 +03:00
48 changed files with 1658 additions and 547 deletions

View File

@ -0,0 +1,5 @@
install_script(cdc.py core)
install_script(cdc_users.py core)
install_script(cdc_last_transaction.py core)
install_script(cdc_kafka_producer.py core)
install_file(cdc_schema.go core)

View File

@ -0,0 +1,97 @@
#!/usr/bin/env python3
# Copyright (c) 2016 MariaDB Corporation Ab
#
# Use of this software is governed by the Business Source License included
# in the LICENSE.TXT file and at www.mariadb.com/bsl.
#
# Change Date: 2019-07-01
#
# On the date above, in accordance with the Business Source License, use
# of this software will be governed by version 2 or later of the General
# Public License.
import time
import json
import re
import sys
import socket
import hashlib
import argparse
import subprocess
import selectors
import binascii
import os
# Read data as JSON
def read_json():
decoder = json.JSONDecoder()
rbuf = bytes()
ep = selectors.EpollSelector()
ep.register(sock, selectors.EVENT_READ)
while True:
pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
try:
buf = sock.recv(4096, socket.MSG_DONTWAIT)
rbuf += buf
while True:
rbuf = rbuf.lstrip()
data = decoder.raw_decode(rbuf.decode('ascii'))
rbuf = rbuf[data[1]:]
print(json.dumps(data[0]))
except ValueError as err:
sys.stdout.flush()
pass
except Exception:
break
# Read data as Avro
def read_avro():
ep = selectors.EpollSelector()
ep.register(sock, selectors.EVENT_READ)
while True:
pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
try:
buf = sock.recv(4096, socket.MSG_DONTWAIT)
os.write(sys.stdout.fileno(), buf)
sys.stdout.flush()
except Exception:
break
parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve")
parser.add_argument("-h", "--host", dest="host", help="Network address where the connection is made", default="localhost")
parser.add_argument("-P", "--port", dest="port", help="Port where the connection is made", default="4001")
parser.add_argument("-u", "--user", dest="user", help="Username used when connecting", default="")
parser.add_argument("-p", "--password", dest="password", help="Password used when connecting", default="")
parser.add_argument("-f", "--format", dest="format", help="Data transmission format", default="JSON", choices=["AVRO", "JSON"])
parser.add_argument("-t", "--timeout", dest="read_timeout", help="Read timeout", default=0)
parser.add_argument("FILE", help="Requested table name in the following format: DATABASE.TABLE[.VERSION]")
parser.add_argument("GTID", help="Requested GTID position", default=None, nargs='?')
opts = parser.parse_args(sys.argv[1:])
sock = socket.create_connection([opts.host, opts.port])
# Authentication
auth_string = binascii.b2a_hex((opts.user + ":").encode())
auth_string += bytes(hashlib.sha1(opts.password.encode("utf_8")).hexdigest().encode())
sock.send(auth_string)
# Discard the response
response = str(sock.recv(1024)).encode('utf_8')
# Register as a client as request Avro format data
sock.send(bytes(("REGISTER UUID=XXX-YYY_YYY, TYPE=" + opts.format).encode()))
# Discard the response again
response = str(sock.recv(1024)).encode('utf_8')
# Request a data stream
sock.send(bytes(("REQUEST-DATA " + opts.FILE + (" " + opts.GTID if opts.GTID else "")).encode()))
if opts.format == "JSON":
read_json()
elif opts.format == "AVRO":
read_avro()

View File

@ -0,0 +1,61 @@
#!/usr/bin/env python3
# Copyright (c) 2016 MariaDB Corporation Ab
#
# Use of this software is governed by the Business Source License included
# in the LICENSE.TXT file and at www.mariadb.com/bsl.
#
# Change Date: 2019-07-01
#
# On the date above, in accordance with the Business Source License, use
# of this software will be governed by version 2 or later of the General
# Public License.
# This program requires the kafka-python package which you can install with:
#
# pip install kafka-python
#
import json
import sys
import argparse
from kafka import KafkaProducer
parser = argparse.ArgumentParser(description = "Publish JSON data read from standard input to a Kafka broker")
parser.add_argument("-K", "--kafka-broker", dest="kafka_broker",
help="Kafka broker in host:port format",
default=None, required=True)
parser.add_argument("-T", "--kafka-topic", dest="kafka_topic",
help="Kafka topic where the data is published",
default=None, required=True)
opts = parser.parse_args(sys.argv[1:])
decoder = json.JSONDecoder()
rbuf = bytes()
producer = KafkaProducer(bootstrap_servers=[opts.kafka_broker])
while True:
try:
buf = sys.stdin.read(128)
if len(buf) == 0:
break
rbuf += buf.encode()
while True:
rbuf = rbuf.lstrip()
data = decoder.raw_decode(rbuf.decode('ascii'))
rbuf = rbuf[data[1]:]
producer.send(topic=opts.kafka_topic, value=json.dumps(data[0]).encode())
# JSONDecoder will return a ValueError if a partial JSON object is read
except ValueError as err:
pass
# All other errors should interrupt the processing
except Exception as ex:
print(ex)
break
producer.flush()

View File

@ -0,0 +1,55 @@
#!/usr/bin/env python3
# Copyright (c) 2016 MariaDB Corporation Ab
#
# Use of this software is governed by the Business Source License included
# in the LICENSE.TXT file and at www.mariadb.com/bsl.
#
# Change Date: 2019-07-01
#
# On the date above, in accordance with the Business Source License, use
# of this software will be governed by version 2 or later of the General
# Public License.
import socket
import hashlib
import argparse
import binascii
import sys
parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve")
parser.add_argument("-h", "--host", dest="host", help="Network address where the connection is made", default="localhost")
parser.add_argument("-P", "--port", dest="port", help="Port where the connection is made", default="4001")
parser.add_argument("-u", "--user", dest="user", help="Username used when connecting", default="")
parser.add_argument("-p", "--password", dest="password", help="Password used when connecting", default="")
parser.add_argument("-f", "--format", dest="format", help="Data transmission format", default="JSON", choices=["AVRO", "JSON"])
parser.add_argument("-t", "--timeout", dest="read_timeout", help="Read timeout", default=0)
parser.add_argument("GTID", help="Requested GTID position", default=None, nargs='?')
opts = parser.parse_args(sys.argv[1:])
sock = socket.create_connection([opts.host, opts.port])
# Authentication
auth_string = binascii.b2a_hex((opts.user + ":").encode())
auth_string += bytes(hashlib.sha1(opts.password.encode("utf_8")).hexdigest().encode())
sock.send(auth_string)
# Discard the response
response = str(sock.recv(1024)).encode('utf_8')
# Register as a client as request Avro format data
sock.send(bytes(("REGISTER UUID=XXX-YYY_YYY, TYPE=" + opts.format).encode()))
# Discard the response again
response = str(sock.recv(1024)).encode('utf_8')
# Register as a client as request Avro format data
if opts.GTID:
sock.send(bytes(("QUERY-TRANSACTION " + opts.GTID).encode()))
else:
sock.send(bytes("QUERY-LAST-TRANSACTION".encode()))
response = sock.recv(1024)
print(response.decode('ascii'))

View File

@ -0,0 +1,168 @@
// Copyright (c) 2016 MariaDB Corporation Ab
//
// Use of this software is governed by the Business Source License included
// in the LICENSE.TXT file and at www.mariadb.com/bsl.
//
// Change Date: 2019-07-01
//
// On the date above, in accordance with the Business Source License, use
// of this software will be governed by version 2 or later of the General
// Public License.
package main
import (
"database/sql"
"encoding/json"
"flag"
"log"
"os"
"regexp"
"strconv"
)
import _ "github.com/go-sql-driver/mysql"
var host = flag.String("host", "localhost", "Server address")
var port = flag.Int("port", 3306, "Server port")
var user = flag.String("user", "", "Server user")
var passwd = flag.String("password", "", "Server password")
var debug = flag.Bool("debug", false, "Debug output")
// Avro field
type Field struct {
Name string `json:"name"`
Type string `json:"type"`
}
// Avro schema
type Schema struct {
Namespace string `json:"namespace"`
Type string `json:"type"`
Name string `json:"name"`
Fields []Field `json:"fields"`
}
// Debugging output helper function
func LogObject(obj interface{}) {
js, err := json.Marshal(obj)
if err != nil {
log.Fatal("Failed to parse object: ", err)
} else {
log.Println("Unsupported type: ", string(js))
}
}
var field_re *regexp.Regexp
// Convert the SQL type to the appropriate Avro type
func (f *Field) ToAvroType() {
f.Type = field_re.ReplaceAllString(f.Type, "")
switch f.Type {
case "date", "datetime", "time", "timestamp", "year", "tinytext", "text",
"mediumtext", "longtext", "char", "varchar", "enum", "set":
f.Type = "string"
case "tinyblob", "blob", "mediumblob", "longblob", "binary", "varbinary":
f.Type = "bytes"
case "int", "smallint", "mediumint", "integer", "tinyint", "short",
"decimal", "bit":
f.Type = "int"
case "float":
f.Type = "float"
case "double":
f.Type = "double"
case "null":
f.Type = "null"
case "long", "bigint":
f.Type = "long"
default:
LogObject(f)
f.Type = "string"
}
}
// Create and store the Avro schema to disk
func StoreSchema(db *sql.DB, schema, table string) {
file, err := os.Create(schema + "." + table + ".000001.avsc")
if err != nil {
log.Fatal("Failed to create file:", err)
}
defer file.Close()
encoder := json.NewEncoder(file)
fields, err := db.Query("DESCRIBE " + schema + "." + table)
if err != nil {
log.Fatal("Failed to query for description of "+schema+"."+table+": ", err)
}
defer fields.Close()
fieldlist := make([]Field, 0, 10)
for fields.Next() {
var field Field
var Null, Key, Default, Extra string
fields.Scan(&field.Name, &field.Type, &Null, &Key, &Default, &Extra)
field.ToAvroType()
fieldlist = append(fieldlist, field)
}
encoder.Encode(Schema{Namespace: "MaxScaleChangeDataSchema.avro", Type: "record", Name: "ChangeRecord", Fields: fieldlist})
}
// Main funtion that queries the database for table names
func main() {
var err error
field_re, err = regexp.Compile("[(].*")
if err != nil {
log.Fatal("Error: ", err)
}
flag.Parse()
var connect_str string = *user + ":" + *passwd + "@tcp(" + *host + ":" + strconv.Itoa(*port) + ")/"
if *debug {
log.Println("Connect string: ", connect_str)
}
db, err := sql.Open("mysql", connect_str)
if err != nil {
log.Fatal("Failed to open connection to", *host, *port, ":", err)
}
defer db.Close()
databases, err := db.Query("SHOW DATABASES")
if err != nil {
log.Fatal("Failed to query for databases: ", err)
}
defer databases.Close()
for databases.Next() {
var schemaname string
databases.Scan(&schemaname)
// Skip the system databases
switch schemaname {
case "mysql", "information_schema", "performance_schema":
continue
}
tables, err := db.Query("SHOW TABLES FROM " + schemaname)
if err != nil {
log.Fatal("Failed to query for tables from "+schemaname+": ", err)
}
defer tables.Close()
for tables.Next() {
var tablename string
tables.Scan(&tablename)
if *debug {
log.Println("Processing", schemaname, ".", tablename)
}
StoreSchema(db, schemaname, tablename)
}
}
}

View File

@ -0,0 +1,21 @@
#!/usr/bin/env python3
# Copyright (c) 2016 MariaDB Corporation Ab
#
# Use of this software is governed by the Business Source License included
# in the LICENSE.TXT file and at www.mariadb.com/bsl.
#
# Change Date: 2019-07-01
#
# On the date above, in accordance with the Business Source License, use
# of this software will be governed by version 2 or later of the General
# Public License.
import sys, binascii, hashlib, argparse
parser = argparse.ArgumentParser(description = "CDC User manager", epilog = "Append the output of this program to /var/lib/maxscale/<service name>/cdcusers")
parser.add_argument("USER", help="Username")
parser.add_argument("PASSWORD", help="Password")
opts = parser.parse_args(sys.argv[1:])
print((opts.USER + ":") + hashlib.sha1(hashlib.sha1(opts.PASSWORD.encode()).digest()).hexdigest().upper())