Markus Mäkelä 1efe3e0b20
Fix avrorouter client notifications
The notifications that tell clients about new data weren't using the
correct mechanism.
2019-05-10 09:11:39 +03:00

223 lines
7.9 KiB
C++

/*
* Copyright (c) 2018 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2022-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#pragma once
#define MXS_MODULE_NAME "avrorouter"
#include <maxscale/cdefs.h>
#include <stdbool.h>
#include <stdint.h>
#include <string>
#include <vector>
#include <maxscale/alloc.h>
#include <maxscale/dcb.h>
#include <maxscale/service.h>
#include <maxscale/mysql_binlog.h>
#include <maxscale/users.h>
#include <cdc.h>
#include <maxavro.h>
#include <binlog_common.h>
#include <maxscale/protocol/mysql.h>
#include <blr_constants.h>
#include "rpl.hh"
MXS_BEGIN_DECLS
/** Name of the file where the binlog to Avro conversion progress is stored */
#define AVRO_PROGRESS_FILE "avro-conversion.ini"
static const char* avro_client_states[] = {"Unregistered", "Registered", "Processing", "Errored"};
static const char* avro_client_client_mode[] = {"Catch-up", "Busy", "Wait_for_data"};
static const char* avro_domain = "domain";
static const char* avro_server_id = "server_id";
static const char* avro_sequence = "sequence";
static const char* avro_event_number = "event_number";
static const char* avro_event_type = "event_type";
static const char* avro_timestamp = "timestamp";
static const char* avro_client_ouput[] = {"Undefined", "JSON", "Avro"};
static inline bool is_reserved_word(const char* word)
{
return strcasecmp(word, avro_domain) == 0
|| strcasecmp(word, avro_server_id) == 0
|| strcasecmp(word, avro_sequence) == 0
|| strcasecmp(word, avro_event_number) == 0
|| strcasecmp(word, avro_event_type) == 0
|| strcasecmp(word, avro_timestamp) == 0;
}
static inline void fix_reserved_word(char* tok)
{
if (is_reserved_word(tok))
{
strcat(tok, "_");
}
}
/** How a binlog file is closed */
typedef enum avro_binlog_end
{
AVRO_OK = 0, /**< A newer binlog file exists with a rotate event to that file */
AVRO_LAST_FILE, /**< Last binlog which is closed */
AVRO_OPEN_TRANSACTION, /**< The binlog ends with an open transaction */
AVRO_BINLOG_ERROR /**< An error occurred while processing the binlog file */
} avro_binlog_end_t;
/** How many numbers each table version has (db.table.000001.avro) */
#define TABLE_MAP_VERSION_DIGITS 6
/** Maximum version number*/
#define TABLE_MAP_VERSION_MAX 999999
/** Maximum column name length */
#define TABLE_MAP_MAX_NAME_LEN 64
/** How many bytes each thread tries to send */
#define AVRO_DATA_BURST_SIZE (32 * 1024)
/** Data format used when streaming data to the clients */
enum avro_data_format
{
AVRO_FORMAT_UNDEFINED,
AVRO_FORMAT_JSON,
AVRO_FORMAT_AVRO,
};
enum mxs_avro_codec_type
{
MXS_AVRO_CODEC_NULL,
MXS_AVRO_CODEC_DEFLATE,
MXS_AVRO_CODEC_SNAPPY, /**< Not yet implemented */
};
static const MXS_ENUM_VALUE codec_values[] =
{
{"null", MXS_AVRO_CODEC_NULL },
{"deflate", MXS_AVRO_CODEC_DEFLATE},
// Not yet implemented
// {"snappy", MXS_AVRO_CODEC_SNAPPY},
{NULL}
};
class Avro : public MXS_ROUTER
{
Avro(const Avro&) = delete;
Avro& operator=(const Avro&) = delete;
public:
static Avro* create(SERVICE* service, SRowEventHandler handler);
SERVICE* service; /*< Pointer to the service using this router */
std::string filestem; /*< Root of binlog filename */
std::string binlogdir; /*< The directory where the binlog files are stored */
std::string avrodir; /*< The directory with the AVRO files */
std::string binlog_name;/*< Name of the current binlog file */
uint64_t current_pos;/*< Current binlog position */
int binlog_fd; /*< File descriptor of the binlog file being read */
uint64_t trx_count; /*< Transactions processed */
uint64_t trx_target; /*< Number of transactions that trigger a flush */
uint64_t row_count; /*< Row events processed */
uint64_t row_target; /*< Number of row events that trigger a flush */
uint32_t task_handle;/**< Delayed task handle */
Rpl handler;
private:
Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRowEventHandler handler);
void read_source_service_options(SERVICE* source);
};
class AvroSession : public MXS_ROUTER_SESSION
{
AvroSession(const AvroSession&) = delete;
AvroSession& operator=(const AvroSession&) = delete;
public:
static AvroSession* create(Avro* router, MXS_SESSION* session);
~AvroSession();
DCB* dcb; /*< The client DCB */
int state; /*< The state of this client */
enum avro_data_format format; /*< Stream JSON or Avro data */
std::string uuid; /*< Client UUID */
Avro* router; /*< Pointer to the owning router */
MAXAVRO_FILE* file_handle; /*< Current open file handle */
uint64_t last_sent_pos;/*< The last record we sent */
time_t connect_time; /*< Connect time of slave */
std::string avro_binfile;
bool requested_gtid; /*< If the client requested */
gtid_pos_t gtid; /*< Current/requested GTID */
gtid_pos_t gtid_start; /*< First sent GTID */
/**
* Process a client request
*
* @param Buffer The incoming request packet
*
* @return 1 on success, 0 on error
*/
int routeQuery(GWBUF* buffer);
void queue_client_callback();
private:
AvroSession(Avro* instance, MXS_SESSION* session);
int do_registration(GWBUF* data);
void process_command(GWBUF* queue);
void send_gtid_info(gtid_pos_t* gtid_pos);
void set_current_gtid(json_t* row);
bool stream_json();
bool stream_binary();
bool seek_to_gtid();
bool stream_data();
void rotate_avro_file(std::string fullname);
void client_callback();
};
void read_table_info(uint8_t* ptr,
uint8_t post_header_len,
uint64_t* table_id,
char* dest,
size_t len);
TableMapEvent* table_map_alloc(uint8_t* ptr, uint8_t hdr_len, TableCreateEvent* create);
STableCreateEvent table_create_alloc(char* ident, const char* sql, int len);
bool table_create_save(TableCreateEvent* create, const char* filename);
bool table_create_alter(TableCreateEvent* create, const char* sql, const char* end);
TableCreateEvent* table_create_from_schema(const char* file,
const char* db,
const char* table,
int version);
void read_table_identifier(const char* db,
const char* sql,
const char* end,
char* dest,
int size);
int avro_client_handle_request(Avro*, AvroSession*, GWBUF*);
void avro_client_rotate(Avro* router, AvroSession* client, uint8_t* ptr);
bool avro_open_binlog(const char* binlogdir, const char* file, int* fd);
void avro_close_binlog(int fd);
avro_binlog_end_t avro_read_all_events(Avro* router);
char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create);
bool handle_row_event(Avro* router, REP_HEADER* hdr, uint8_t* ptr);
void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos);
REP_HEADER construct_header(uint8_t* ptr);
bool avro_save_conversion_state(Avro* router);
bool avro_load_conversion_state(Avro* router);
void avro_load_metadata_from_schemas(Avro* router);
void notify_all_clients(Avro* router);
MXS_END_DECLS