Clean up avrorouter.hh

Cleaned up the header and removed most of unused member variables.
This commit is contained in:
Markus Mäkelä
2018-05-22 09:46:47 +03:00
parent 8dd89548f4
commit 6898397d49
2 changed files with 97 additions and 164 deletions

View File

@ -484,12 +484,10 @@ createInstance(SERVICE *service, char **options)
} }
memset(&inst->stats, 0, sizeof(AVRO_ROUTER_STATS)); memset(&inst->stats, 0, sizeof(AVRO_ROUTER_STATS));
spinlock_init(&inst->fileslock);
inst->service = service; inst->service = service;
inst->binlog_fd = -1; inst->binlog_fd = -1;
inst->current_pos = 4; inst->current_pos = 4;
inst->binlog_position = 4; inst->binlog_position = 4;
inst->next = NULL;
inst->lastEventTimestamp = 0; inst->lastEventTimestamp = 0;
inst->binlog_position = 0; inst->binlog_position = 0;
inst->task_delay = 1; inst->task_delay = 1;
@ -681,12 +679,6 @@ newSession(MXS_ROUTER *instance, MXS_SESSION *session)
return NULL; return NULL;
} }
#if defined(SS_DEBUG)
client->rses_chk_top = CHK_NUM_ROUTER_SES;
client->rses_chk_tail = CHK_NUM_ROUTER_SES;
#endif
memset(&client->stats, 0, sizeof(AVRO_CLIENT_STATS));
atomic_add(&inst->stats.n_clients, 1); atomic_add(&inst->stats.n_clients, 1);
client->uuid = NULL; client->uuid = NULL;
spinlock_init(&client->catch_lock); spinlock_init(&client->catch_lock);
@ -715,8 +707,6 @@ newSession(MXS_ROUTER *instance, MXS_SESSION *session)
sqlite3_close_v2(client->sqlite_handle); sqlite3_close_v2(client->sqlite_handle);
} }
CHK_CLIENT_RSES(client);
return reinterpret_cast<MXS_ROUTER_SESSION*>(client); return reinterpret_cast<MXS_ROUTER_SESSION*>(client);
} }
@ -759,8 +749,6 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio
Avro *router = (Avro *) instance; Avro *router = (Avro *) instance;
AvroSession *client = (AvroSession *) router_session; AvroSession *client = (AvroSession *) router_session;
CHK_CLIENT_RSES(client);
spinlock_acquire(&client->catch_lock); spinlock_acquire(&client->catch_lock);
spinlock_acquire(&client->file_lock); spinlock_acquire(&client->file_lock);
@ -801,17 +789,12 @@ static void
diagnostics(MXS_ROUTER *router, DCB *dcb) diagnostics(MXS_ROUTER *router, DCB *dcb)
{ {
Avro *router_inst = (Avro *) router; Avro *router_inst = (Avro *) router;
char buf[40];
struct tm tm;
dcb_printf(dcb, "\tAVRO Converter infofile: %s/%s\n", dcb_printf(dcb, "\tAVRO Converter infofile: %s/%s\n",
router_inst->avrodir, AVRO_PROGRESS_FILE); router_inst->avrodir, AVRO_PROGRESS_FILE);
dcb_printf(dcb, "\tAVRO files directory: %s\n", dcb_printf(dcb, "\tAVRO files directory: %s\n",
router_inst->avrodir); router_inst->avrodir);
localtime_r(&router_inst->stats.lastReply, &tm);
asctime_r(&tm, buf);
dcb_printf(dcb, "\tBinlog directory: %s\n", dcb_printf(dcb, "\tBinlog directory: %s\n",
router_inst->binlogdir); router_inst->binlogdir);
dcb_printf(dcb, "\tCurrent binlog file: %s\n", dcb_printf(dcb, "\tCurrent binlog file: %s\n",

View File

@ -1,6 +1,4 @@
#pragma once #pragma once
#ifndef _MXS_AVRO_H
#define _MXS_AVRO_H
/* /*
* Copyright (c) 2016 MariaDB Corporation Ab * Copyright (c) 2016 MariaDB Corporation Ab
* *
@ -38,7 +36,6 @@ MXS_BEGIN_DECLS
/** /**
* How often to call the router status function (seconds) * How often to call the router status function (seconds)
*/ */
#define AVRO_STATS_FREQ 60
#define AVRO_NSTATS_MINUTES 30 #define AVRO_NSTATS_MINUTES 30
/** /**
@ -71,22 +68,22 @@ MXS_BEGIN_DECLS
static const char *avro_client_states[] = { "Unregistered", "Registered", "Processing", "Errored" }; static const char *avro_client_states[] = { "Unregistered", "Registered", "Processing", "Errored" };
static const char *avro_client_client_mode[] = { "Catch-up", "Busy", "Wait_for_data" }; static const char *avro_client_client_mode[] = { "Catch-up", "Busy", "Wait_for_data" };
static const char *avro_domain = "domain"; static const char *avro_domain = "domain";
static const char *avro_server_id = "server_id"; static const char *avro_server_id = "server_id";
static const char *avro_sequence = "sequence"; static const char *avro_sequence = "sequence";
static const char *avro_event_number = "event_number"; static const char *avro_event_number = "event_number";
static const char *avro_event_type = "event_type"; static const char *avro_event_type = "event_type";
static const char *avro_timestamp = "timestamp"; static const char *avro_timestamp = "timestamp";
static const char *avro_client_ouput[] = { "Undefined", "JSON", "Avro" }; static const char *avro_client_ouput[] = { "Undefined", "JSON", "Avro" };
static inline bool is_reserved_word(const char* word) static inline bool is_reserved_word(const char* word)
{ {
return strcasecmp(word, avro_domain) == 0 || return strcasecmp(word, avro_domain) == 0 ||
strcasecmp(word, avro_server_id) == 0 || strcasecmp(word, avro_server_id) == 0 ||
strcasecmp(word, avro_sequence) == 0 || strcasecmp(word, avro_sequence) == 0 ||
strcasecmp(word, avro_event_number) == 0 || strcasecmp(word, avro_event_number) == 0 ||
strcasecmp(word, avro_event_type) == 0 || strcasecmp(word, avro_event_type) == 0 ||
strcasecmp(word, avro_timestamp) == 0; strcasecmp(word, avro_timestamp) == 0;
} }
static inline void fix_reserved_word(char *tok) static inline void fix_reserved_word(char *tok)
@ -122,13 +119,13 @@ typedef enum avro_binlog_end
typedef struct table_create typedef struct table_create
{ {
uint64_t columns; uint64_t columns;
char **column_names; char** column_names;
char **column_types; char** column_types;
int* column_lengths; int* column_lengths;
char *table; char* table;
char *database; char* database;
int version; /**< How many versions of this table have been used */ int version; /**< How many versions of this table have been used */
bool was_used; /**< Has this schema been persisted to disk */ bool was_used; /**< Has this schema been persisted to disk */
} TABLE_CREATE; } TABLE_CREATE;
/** A representation of a table map event read from a binary log. A table map /** A representation of a table map event read from a binary log. A table map
@ -137,18 +134,18 @@ typedef struct table_create
* some meta information on the columns. */ * some meta information on the columns. */
typedef struct table_map typedef struct table_map
{ {
uint64_t id; uint64_t id;
uint64_t columns; uint64_t columns;
uint16_t flags; uint16_t flags;
uint8_t *column_types; uint8_t* column_types;
uint8_t *null_bitmap; uint8_t* null_bitmap;
uint8_t *column_metadata; uint8_t* column_metadata;
size_t column_metadata_size; size_t column_metadata_size;
TABLE_CREATE *table_create; /*< The definition of the table */ TABLE_CREATE* table_create; /*< The definition of the table */
int version; int version;
char version_string[TABLE_MAP_VERSION_DIGITS + 1]; char version_string[TABLE_MAP_VERSION_DIGITS + 1];
char *table; char* table;
char *database; char* database;
} TABLE_MAP; } TABLE_MAP;
/** /**
@ -156,40 +153,16 @@ typedef struct table_map
*/ */
typedef struct typedef struct
{ {
int n_clients; /*< Number slave sessions created */ int n_clients; /*< Number client sessions created */
int n_reads; /*< Number of record reads */
uint64_t n_binlogs; /*< Number of binlog records from master */
uint64_t n_rotates; /*< Number of binlog rotate events */
int n_masterstarts; /*< Number of times connection restarted */
time_t lastReply;
uint64_t events[MAX_EVENT_TYPE_END + 1]; /*< Per event counters */
uint64_t lastsample;
int minno;
int minavgs[AVRO_NSTATS_MINUTES];
} AVRO_ROUTER_STATS; } AVRO_ROUTER_STATS;
/**
* Client statistics
*/
typedef struct
{
int n_events; /*< Number of events sent */
unsigned long n_bytes; /*< Number of bytes sent */
int n_requests; /*< Number of requests received */
int n_queries; /*< Number of queries */
int n_failed_read;
uint64_t lastsample;
int minno;
int minavgs[AVRO_NSTATS_MINUTES];
} AVRO_CLIENT_STATS;
typedef struct avro_table_t typedef struct avro_table_t
{ {
char* filename; /*< Absolute filename */ char* filename; /*< Absolute filename */
char* json_schema; /*< JSON representation of the schema */ char* json_schema; /*< JSON representation of the schema */
avro_file_writer_t avro_file; /*< Current Avro data file */ avro_file_writer_t avro_file; /*< Current Avro data file */
avro_value_iface_t *avro_writer_iface; /*< Avro C API writer interface */ avro_value_iface_t* avro_writer_iface; /*< Avro C API writer interface */
avro_schema_t avro_schema; /*< Native Avro schema of the table */ avro_schema_t avro_schema; /*< Native Avro schema of the table */
} AVRO_TABLE; } AVRO_TABLE;
/** Data format used when streaming data to the clients */ /** Data format used when streaming data to the clients */
@ -204,100 +177,79 @@ enum mxs_avro_codec_type
{ {
MXS_AVRO_CODEC_NULL, MXS_AVRO_CODEC_NULL,
MXS_AVRO_CODEC_DEFLATE, MXS_AVRO_CODEC_DEFLATE,
MXS_AVRO_CODEC_SNAPPY, /**< Not yet implemented */ MXS_AVRO_CODEC_SNAPPY, /**< Not yet implemented */
} ; } ;
typedef struct gtid_pos typedef struct gtid_pos
{ {
uint32_t timestamp; /*< GTID event timestamp */ uint32_t timestamp; /*< GTID event timestamp */
uint64_t domain; /*< Replication domain */ uint64_t domain; /*< Replication domain */
uint64_t server_id; /*< Server ID */ uint64_t server_id; /*< Server ID */
uint64_t seq; /*< Sequence number */ uint64_t seq; /*< Sequence number */
uint64_t event_num; /*< Subsequence number, increases monotonically. This uint64_t event_num; /*< Subsequence number, increases monotonically. This
* is an internal representation of the position of * is an internal representation of the position of
* an event inside a GTID event and it is used to * an event inside a GTID event and it is used to
* rebuild GTID events in the correct order. */ * rebuild GTID events in the correct order. */
} gtid_pos_t; } gtid_pos_t;
struct Avro;
/**
* The client structure used within this router.
* This represents the clients that are requesting AVRO files from MaxScale.
*/
struct AvroSession
{
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_top;
#endif
DCB *dcb; /*< The client DCB */
int state; /*< The state of this client */
enum avro_data_format format; /*< Stream JSON or Avro data */
char *uuid; /*< Client UUID */
SPINLOCK catch_lock; /*< Event catchup lock */
SPINLOCK file_lock; /*< Protects rses_deleted */
Avro* router; /*< Pointer to the owning router */
MAXAVRO_FILE *file_handle; /*< Current open file handle */
uint64_t last_sent_pos; /*< The last record we sent */
AVRO_CLIENT_STATS stats; /*< Slave statistics */
time_t connect_time; /*< Connect time of slave */
MAXAVRO_FILE avro_file; /*< Avro file struct */
char avro_binfile[AVRO_MAX_FILENAME_LEN + 1];
bool requested_gtid; /*< If the client requested */
gtid_pos_t gtid; /*< Current/requested GTID */
gtid_pos_t gtid_start; /*< First sent GTID */
unsigned int cstate; /*< Catch up state */
sqlite3 *sqlite_handle;
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail;
#endif
};
/**
* * The per instance data for the AVRO router.
* */
struct Avro struct Avro
{ {
SERVICE *service; /*< Pointer to the service using this router */ SERVICE* service; /*< Pointer to the service using this router */
int initbinlog; /*< Initial binlog file number */ int initbinlog; /*< Initial binlog file number */
char *fileroot; /*< Root of binlog filename */ char* fileroot; /*< Root of binlog filename */
unsigned int state; /*< State of the AVRO router */ unsigned int state; /*< State of the AVRO router */
uint8_t lastEventReceived; /*< Last even received */ uint8_t lastEventReceived; /*< Last even received */
uint32_t lastEventTimestamp; /*< Timestamp from last event */ uint32_t lastEventTimestamp; /*< Timestamp from last event */
char *binlogdir; /*< The directory where the binlog files are stored */ char* binlogdir; /*< The directory where the binlog files are stored */
char *avrodir; /*< The directory with the AVRO files */ char* avrodir; /*< The directory with the AVRO files */
char binlog_name[BINLOG_FNAMELEN + 1]; char binlog_name[BINLOG_FNAMELEN + 1]; /*< Name of the current binlog file */
/*< Name of the current binlog file */ uint64_t binlog_position; /*< last committed transaction position */
uint64_t binlog_position; uint64_t current_pos; /*< Current binlog position */
/*< last committed transaction position */ int binlog_fd; /*< File descriptor of the binlog file being read */
uint64_t current_pos; pcre2_code* create_table_re;
/*< Current binlog position */ pcre2_code* alter_table_re;
int binlog_fd; /*< File descriptor of the binlog file being read */ uint8_t event_types;
pcre2_code *create_table_re; uint8_t event_type_hdr_lens[MAX_EVENT_TYPE_END];
pcre2_code *alter_table_re; uint8_t binlog_checksum;
uint8_t event_types; gtid_pos_t gtid;
uint8_t event_type_hdr_lens[MAX_EVENT_TYPE_END]; TABLE_MAP* active_maps[MAX_MAPPED_TABLES];
uint8_t binlog_checksum; HASHTABLE* table_maps;
gtid_pos_t gtid; HASHTABLE* open_tables;
TABLE_MAP *active_maps[MAX_MAPPED_TABLES]; HASHTABLE* created_tables;
HASHTABLE *table_maps; sqlite3* sqlite_handle;
HASHTABLE *open_tables; char prevbinlog[BINLOG_FNAMELEN + 1];
HASHTABLE *created_tables; int rotating; /*< Rotation in progress flag */
sqlite3 *sqlite_handle; AVRO_ROUTER_STATS stats; /*< Statistics for this router */
char prevbinlog[BINLOG_FNAMELEN + 1]; int task_delay; /*< Delay in seconds until the next conversion takes place */
int rotating; /*< Rotation in progress flag */ uint64_t trx_count; /*< Transactions processed */
SPINLOCK fileslock; /*< Lock for the files queue above */ uint64_t trx_target; /*< Minimum about of transactions that will trigger
AVRO_ROUTER_STATS stats; /*< Statistics for this router */ * a flush of all tables */
int task_delay; /*< Delay in seconds until the next conversion takes place */ uint64_t row_count; /*< Row events processed */
uint64_t trx_count; /*< Transactions processed */ uint64_t row_target; /*< Minimum about of row events that will trigger
uint64_t trx_target; /*< Minimum about of transactions that will trigger * a flush of all tables */
* a flush of all tables */ uint64_t block_size; /**< Avro datablock size */
uint64_t row_count; /*< Row events processed */
uint64_t row_target; /*< Minimum about of row events that will trigger
* a flush of all tables */
uint64_t block_size; /**< Avro datablock size */
enum mxs_avro_codec_type codec; /**< Avro codec type, defaults to `null` */ enum mxs_avro_codec_type codec; /**< Avro codec type, defaults to `null` */
struct avro_instance *next; };
struct AvroSession
{
DCB* dcb; /*< The client DCB */
int state; /*< The state of this client */
enum avro_data_format format; /*< Stream JSON or Avro data */
char* uuid; /*< Client UUID */
SPINLOCK catch_lock; /*< Event catchup lock */
SPINLOCK file_lock; /*< Protects rses_deleted */
Avro* router; /*< Pointer to the owning router */
MAXAVRO_FILE* file_handle; /*< Current open file handle */
uint64_t last_sent_pos; /*< The last record we sent */
time_t connect_time; /*< Connect time of slave */
MAXAVRO_FILE avro_file; /*< Avro file struct */
char avro_binfile[AVRO_MAX_FILENAME_LEN + 1];
bool requested_gtid; /*< If the client requested */
gtid_pos_t gtid; /*< Current/requested GTID */
gtid_pos_t gtid_start; /*< First sent GTID */
unsigned int cstate; /*< Catch up state */
sqlite3* sqlite_handle;
}; };
extern void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *table_id, extern void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *table_id,
@ -350,5 +302,3 @@ extern void avro_flush_all_tables(Avro *router, enum avrorouter_file_op flush);
#define AVRO_WAIT_DATA 0x0002 #define AVRO_WAIT_DATA 0x0002
MXS_END_DECLS MXS_END_DECLS
#endif