/* * Copyright (c) 2018 MariaDB Corporation Ab * * Use of this software is governed by the Business Source License included * in the LICENSE.TXT file and at www.mariadb.com/bsl11. * * Change Date: 2022-01-01 * * On the date above, in accordance with the Business Source License, use * of this software will be governed by version 2 or later of the General * Public License. */ #pragma once #define MXS_MODULE_NAME "avrorouter" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "rpl.hh" MXS_BEGIN_DECLS /** Name of the file where the binlog to Avro conversion progress is stored */ #define AVRO_PROGRESS_FILE "avro-conversion.ini" static const char* avro_client_states[] = {"Unregistered", "Registered", "Processing", "Errored"}; static const char* avro_client_client_mode[] = {"Catch-up", "Busy", "Wait_for_data"}; static const char* avro_domain = "domain"; static const char* avro_server_id = "server_id"; static const char* avro_sequence = "sequence"; static const char* avro_event_number = "event_number"; static const char* avro_event_type = "event_type"; static const char* avro_timestamp = "timestamp"; static const char* avro_client_ouput[] = {"Undefined", "JSON", "Avro"}; static inline bool is_reserved_word(const char* word) { return strcasecmp(word, avro_domain) == 0 || strcasecmp(word, avro_server_id) == 0 || strcasecmp(word, avro_sequence) == 0 || strcasecmp(word, avro_event_number) == 0 || strcasecmp(word, avro_event_type) == 0 || strcasecmp(word, avro_timestamp) == 0; } static inline void fix_reserved_word(char* tok) { if (is_reserved_word(tok)) { strcat(tok, "_"); } } /** How a binlog file is closed */ typedef enum avro_binlog_end { AVRO_OK = 0, /**< A newer binlog file exists with a rotate event to that file */ AVRO_LAST_FILE, /**< Last binlog which is closed */ AVRO_OPEN_TRANSACTION, /**< The binlog ends with an open transaction */ AVRO_BINLOG_ERROR /**< An error occurred while processing the binlog file */ } avro_binlog_end_t; /** How many numbers each table version has (db.table.000001.avro) */ #define TABLE_MAP_VERSION_DIGITS 6 /** Maximum version number*/ #define TABLE_MAP_VERSION_MAX 999999 /** Maximum column name length */ #define TABLE_MAP_MAX_NAME_LEN 64 /** How many bytes each thread tries to send */ #define AVRO_DATA_BURST_SIZE (32 * 1024) /** Data format used when streaming data to the clients */ enum avro_data_format { AVRO_FORMAT_UNDEFINED, AVRO_FORMAT_JSON, AVRO_FORMAT_AVRO, }; enum mxs_avro_codec_type { MXS_AVRO_CODEC_NULL, MXS_AVRO_CODEC_DEFLATE, MXS_AVRO_CODEC_SNAPPY, /**< Not yet implemented */ }; static const MXS_ENUM_VALUE codec_values[] = { {"null", MXS_AVRO_CODEC_NULL }, {"deflate", MXS_AVRO_CODEC_DEFLATE}, // Not yet implemented // {"snappy", MXS_AVRO_CODEC_SNAPPY}, {NULL} }; class Avro : public MXS_ROUTER { Avro(const Avro&) = delete; Avro& operator=(const Avro&) = delete; public: static Avro* create(SERVICE* service, SRowEventHandler handler); SERVICE* service; /*< Pointer to the service using this router */ std::string filestem; /*< Root of binlog filename */ std::string binlogdir; /*< The directory where the binlog files are stored */ std::string avrodir; /*< The directory with the AVRO files */ std::string binlog_name;/*< Name of the current binlog file */ uint64_t current_pos;/*< Current binlog position */ int binlog_fd; /*< File descriptor of the binlog file being read */ uint64_t trx_count; /*< Transactions processed */ uint64_t trx_target; /*< Number of transactions that trigger a flush */ uint64_t row_count; /*< Row events processed */ uint64_t row_target; /*< Number of row events that trigger a flush */ uint32_t task_handle;/**< Delayed task handle */ Rpl handler; private: Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRowEventHandler handler); void read_source_service_options(SERVICE* source); }; class AvroSession : public MXS_ROUTER_SESSION { AvroSession(const AvroSession&) = delete; AvroSession& operator=(const AvroSession&) = delete; public: static AvroSession* create(Avro* router, MXS_SESSION* session); ~AvroSession(); DCB* dcb; /*< The client DCB */ int state; /*< The state of this client */ enum avro_data_format format; /*< Stream JSON or Avro data */ std::string uuid; /*< Client UUID */ Avro* router; /*< Pointer to the owning router */ MAXAVRO_FILE* file_handle; /*< Current open file handle */ uint64_t last_sent_pos;/*< The last record we sent */ time_t connect_time; /*< Connect time of slave */ std::string avro_binfile; bool requested_gtid; /*< If the client requested */ gtid_pos_t gtid; /*< Current/requested GTID */ gtid_pos_t gtid_start; /*< First sent GTID */ /** * Process a client request * * @param Buffer The incoming request packet * * @return 1 on success, 0 on error */ int routeQuery(GWBUF* buffer); /** * Handler for the EPOLLOUT event */ void client_callback(); private: AvroSession(Avro* instance, MXS_SESSION* session); int do_registration(GWBUF* data); void process_command(GWBUF* queue); void send_gtid_info(gtid_pos_t* gtid_pos); void set_current_gtid(json_t* row); bool stream_json(); bool stream_binary(); bool seek_to_gtid(); bool stream_data(); void rotate_avro_file(std::string fullname); }; void read_table_info(uint8_t* ptr, uint8_t post_header_len, uint64_t* table_id, char* dest, size_t len); TableMapEvent* table_map_alloc(uint8_t* ptr, uint8_t hdr_len, TableCreateEvent* create); STableCreateEvent table_create_alloc(char* ident, const char* sql, int len); bool table_create_save(TableCreateEvent* create, const char* filename); bool table_create_alter(TableCreateEvent* create, const char* sql, const char* end); TableCreateEvent* table_create_from_schema(const char* file, const char* db, const char* table, int version); void read_table_identifier(const char* db, const char* sql, const char* end, char* dest, int size); int avro_client_handle_request(Avro*, AvroSession*, GWBUF*); void avro_client_rotate(Avro* router, AvroSession* client, uint8_t* ptr); bool avro_open_binlog(const char* binlogdir, const char* file, int* fd); void avro_close_binlog(int fd); avro_binlog_end_t avro_read_all_events(Avro* router); char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create); bool handle_row_event(Avro* router, REP_HEADER* hdr, uint8_t* ptr); void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos); REP_HEADER construct_header(uint8_t* ptr); bool avro_save_conversion_state(Avro* router); bool avro_load_conversion_state(Avro* router); void avro_load_metadata_from_schemas(Avro* router); void notify_all_clients(Avro* router); MXS_END_DECLS