MXS-1881: Abstract the final event processing

The final part of the row event processing is now done by an
implementation of the EventConverter class. This makes the implementation
of different storage types easier as only the actual storage operation
needs to be implemented.
This commit is contained in:
Markus Mäkelä
2018-05-30 01:58:45 +03:00
parent c56b2063aa
commit f574703f8e

View File

@ -30,12 +30,191 @@ static bool warn_bit = false; /**< Remove when support for BIT is added */
static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET values
* larger than 255 is added */
uint8_t* process_row_event_data(TableMapEvent *map, TableCreateEvent *create,
avro_value_t *record, uint8_t *ptr,
uint8_t *columns_present, uint8_t *end);
void notify_all_clients(Avro *router);
void add_used_table(Avro* router, const char* table);
class EventConverter
{
public:
EventConverter(const STableMapEvent& map, const STableCreateEvent& create):
m_map(map),
m_create(create)
{
}
virtual ~EventConverter()
{
}
// Prepare a new row for processing
virtual void prepare(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type) = 0;
// Called once all columns are processed
virtual bool commit() = 0;
// 32-bit integer handler
virtual void column(int i, int32_t value) = 0;
// 64-bit integer handler
virtual void column(int i, int64_t value) = 0;
// Float handler
virtual void column(int i, float value) = 0;
// Double handler
virtual void column(int i, double value) = 0;
// String handler
virtual void column(int i, std::string value) = 0;
// Bytes handler
virtual void column(int i, uint8_t* value, int len) = 0;
// Empty (NULL) value type handler
virtual void column(int i) = 0;
protected:
const STableMapEvent& m_map;
const STableCreateEvent& m_create;
};
class AvroConverter : public EventConverter
{
public:
AvroConverter(const STableMapEvent& map, const STableCreateEvent& create, SAvroTable table):
EventConverter(map, create),
m_writer_iface(table->avro_writer_iface),
m_avro_file(table->avro_file)
{
avro_generic_value_new(m_writer_iface, &m_record);
}
~AvroConverter()
{
avro_value_decref(&m_record);
}
void prepare(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type)
{
avro_value_get_by_name(&m_record, avro_domain, &m_field, NULL);
avro_value_set_int(&m_field, gtid.domain);
avro_value_get_by_name(&m_record, avro_server_id, &m_field, NULL);
avro_value_set_int(&m_field, gtid.server_id);
avro_value_get_by_name(&m_record, avro_sequence, &m_field, NULL);
avro_value_set_int(&m_field, gtid.seq);
avro_value_get_by_name(&m_record, avro_event_number, &m_field, NULL);
avro_value_set_int(&m_field, gtid.event_num);
avro_value_get_by_name(&m_record, avro_timestamp, &m_field, NULL);
avro_value_set_int(&m_field, hdr.timestamp);
avro_value_get_by_name(&m_record, avro_event_type, &m_field, NULL);
avro_value_set_enum(&m_field, event_type);
}
// Called once all columns are processed
bool commit()
{
bool rval = true;
if (avro_file_writer_append_value(m_avro_file, &m_record))
{
MXS_ERROR("Failed to write value: %s", avro_strerror());
rval = false;
}
return rval;
}
// 32-bit integer handler
void column(int i, int32_t value)
{
set_active(i);
avro_value_set_int(&m_field, value);
}
// 64-bit integer handler
void column(int i, int64_t value)
{
set_active(i);
avro_value_set_long(&m_field, value);
}
// Float handler
void column(int i, float value)
{
set_active(i);
avro_value_set_float(&m_field, value);
}
// Double handler
void column(int i, double value)
{
set_active(i);
avro_value_set_double(&m_field, value);
}
// String handler
void column(int i, std::string value)
{
set_active(i);
avro_value_set_string(&m_field, value.c_str());
}
// Bytes handler
void column(int i, uint8_t* value, int len)
{
set_active(i);
avro_value_set_bytes(&m_field, value, len);
}
// Empty (NULL) value type handler
void column(int i)
{
set_active(i);
if (column_is_blob(m_map->column_types[i]))
{
uint8_t nullvalue = 0;
avro_value_set_bytes(&m_field, &nullvalue, 1);
}
else
{
avro_value_set_null(&m_field);
}
}
private:
avro_value_iface_t* m_writer_iface;
avro_file_writer_t& m_avro_file;
avro_value_t m_record;
avro_value_t m_field;
void set_active(int i)
{
ss_debug(int rc =)avro_value_get_by_name(&m_record, m_create->columns[i].name.c_str(),
&m_field, NULL);
ss_dassert(rc == 0);
}
};
uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create, EventConverter* conv,
uint8_t *ptr, uint8_t *columns_present, uint8_t *end);
/**
* @brief Get row event name
* @param event Event type
@ -181,41 +360,6 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
return rval;
}
/**
* @brief Set common field values and update the GTID subsequence counter
*
* This sets the domain, server ID, sequence and event position fields of
* the GTID. It also sets the event timestamp and event type fields.
*
* @param router Avro router instance
* @param hdr Replication header
* @param event_type Event type
* @param record Record to prepare
*/
static void prepare_record(Avro *router, REP_HEADER *hdr,
int event_type, avro_value_t *record)
{
avro_value_t field;
avro_value_get_by_name(record, avro_domain, &field, NULL);
avro_value_set_int(&field, router->gtid.domain);
avro_value_get_by_name(record, avro_server_id, &field, NULL);
avro_value_set_int(&field, router->gtid.server_id);
avro_value_get_by_name(record, avro_sequence, &field, NULL);
avro_value_set_int(&field, router->gtid.seq);
router->gtid.event_num++;
avro_value_get_by_name(record, avro_event_number, &field, NULL);
avro_value_set_int(&field, router->gtid.event_num);
avro_value_get_by_name(record, avro_timestamp, &field, NULL);
avro_value_set_int(&field, hdr->timestamp);
avro_value_get_by_name(record, avro_event_type, &field, NULL);
avro_value_set_enum(&field, event_type);
}
/**
* @brief Handle a single RBR row event
*
@ -292,7 +436,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
if (it != router->active_maps.end())
{
TableMapEvent* map = it->second.get();
STableMapEvent map = it->second;
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str());
SAvroTable table;
@ -308,8 +452,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
if (table && create != router->created_tables.end() &&
ncolumns == map->columns() && create->second->columns.size() == map->columns())
{
avro_value_t record;
avro_generic_value_new(table->avro_writer_iface, &record);
AvroConverter conv(map, create->second, table);
/** Each event has one or more rows in it. The number of rows is not known
* beforehand so we must continue processing them until we reach the end
@ -320,38 +463,29 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
while (ptr < end)
{
static uint64_t total_row_count = 1;
MXS_INFO("Row %lu", total_row_count++);
/** Add the current GTID and timestamp */
int event_type = get_event_type(hdr->event_type);
prepare_record(router, hdr, event_type, &record);
ptr = process_row_event_data(map, create->second.get(), &record, ptr, col_present, end);
if (avro_file_writer_append_value(table->avro_file, &record))
{
MXS_ERROR("Failed to write value at position %ld: %s",
router->current_pos, avro_strerror());
}
// Increment the event count for this transaction
router->gtid.event_num++;
conv.prepare(router->gtid, *hdr, event_type);
ptr = process_row_event_data(map, create->second, &conv, ptr, col_present, end);
conv.commit();
/** Update rows events have the before and after images of the
* affected rows so we'll process them as another record with
* a different type */
if (event_type == UPDATE_EVENT)
{
prepare_record(router, hdr, UPDATE_EVENT_AFTER, &record);
ptr = process_row_event_data(map, create->second.get(), &record, ptr, col_present, end);
if (avro_file_writer_append_value(table->avro_file, &record))
{
MXS_ERROR("Failed to write value at position %ld: %s",
router->current_pos, avro_strerror());
}
conv.prepare(router->gtid, *hdr, UPDATE_EVENT_AFTER);
ptr = process_row_event_data(map, create->second, &conv, ptr, col_present, end);
conv.commit();
}
rows++;
}
add_used_table(router, table_ident);
avro_value_decref(&record);
rval = true;
}
else if (!table)
@ -390,28 +524,30 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
/**
* @brief Unpack numeric types
*
* Convert the stored value into an Avro value and pack it in the record.
* Convert the raw binary data into actual numeric types.
*
* @param field Avro value in a record
* @param type Type of the field
* @param conv Event converter to use
* @param idx Position of this column in the row
* @param type Event type
* @param metadata Field metadata
* @param value Pointer to the start of the in-memory representation of the data
* @param value Pointer to the start of the in-memory representation of the data
*/
void set_numeric_field_value(avro_value_t *field, uint8_t type, uint8_t *metadata, uint8_t *value)
void set_numeric_field_value(EventConverter* conv, int idx, uint8_t type,
uint8_t *metadata, uint8_t *value)
{
switch (type)
{
case TABLE_COL_TYPE_TINY:
{
char c = *value;
avro_value_set_int(field, c);
conv->column(idx, c);
break;
}
case TABLE_COL_TYPE_SHORT:
{
short s = gw_mysql_get_byte2(value);
avro_value_set_int(field, s);
conv->column(idx, s);
break;
}
@ -424,21 +560,21 @@ void set_numeric_field_value(avro_value_t *field, uint8_t type, uint8_t *metadat
x = -((0xffffff & (~x)) + 1);
}
avro_value_set_int(field, x);
conv->column(idx, x);
break;
}
case TABLE_COL_TYPE_LONG:
{
int x = gw_mysql_get_byte4(value);
avro_value_set_int(field, x);
conv->column(idx, x);
break;
}
case TABLE_COL_TYPE_LONGLONG:
{
long l = gw_mysql_get_byte8(value);
avro_value_set_long(field, l);
conv->column(idx, l);
break;
}
@ -446,7 +582,7 @@ void set_numeric_field_value(avro_value_t *field, uint8_t type, uint8_t *metadat
{
float f = 0;
memcpy(&f, value, 4);
avro_value_set_float(field, f);
conv->column(idx, f);
break;
}
@ -454,7 +590,7 @@ void set_numeric_field_value(avro_value_t *field, uint8_t type, uint8_t *metadat
{
double d = 0;
memcpy(&d, value, 8);
avro_value_set_double(field, d);
conv->column(idx, d);
break;
}
@ -556,11 +692,10 @@ static bool all_fields_null(uint8_t* null_bitmap, int ncolumns)
* this row event. Currently this should be a bitfield which has all bits set.
* @return Pointer to the first byte after the current row event
*/
uint8_t* process_row_event_data(TableMapEvent *map, TableCreateEvent *create, avro_value_t *record,
uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create, EventConverter* conv,
uint8_t *ptr, uint8_t *columns_present, uint8_t *end)
{
int npresent = 0;
avro_value_t field;
long ncolumns = map->columns();
uint8_t *metadata = &map->column_metadata[0];
size_t metadata_offset = 0;
@ -579,25 +714,13 @@ uint8_t* process_row_event_data(TableMapEvent *map, TableCreateEvent *create, av
for (long i = 0; i < ncolumns && npresent < ncolumns; i++)
{
ss_debug(int rc = )avro_value_get_by_name(record, create->columns[i].name.c_str(),
&field, NULL);
ss_dassert(rc == 0);
if (bit_is_set(columns_present, ncolumns, i))
{
npresent++;
if (bit_is_set(null_bitmap, ncolumns, i))
{
sprintf(trace[i], "[%ld] NULL", i);
if (column_is_blob(map->column_types[i]))
{
uint8_t nullvalue = 0;
avro_value_set_bytes(&field, &nullvalue, 1);
}
else
{
avro_value_set_null(&field);
}
conv->column(i);
}
else if (column_is_fixed_string(map->column_types[i]))
{
@ -609,7 +732,7 @@ uint8_t* process_row_event_data(TableMapEvent *map, TableCreateEvent *create, av
uint64_t bytes = unpack_enum(ptr, &metadata[metadata_offset], val);
char strval[bytes * 2 + 1];
gw_bin2hex(strval, val, bytes);
avro_value_set_string(&field, strval);
conv->column(i, strval);
sprintf(trace[i], "[%ld] ENUM: %lu bytes", i, bytes);
ptr += bytes;
check_overflow(ptr <= end);
@ -646,14 +769,13 @@ uint8_t* process_row_event_data(TableMapEvent *map, TableCreateEvent *create, av
char str[bytes + 1];
memcpy(str, ptr, bytes);
str[bytes] = '\0';
avro_value_set_string(&field, str);
conv->column(i, str);
ptr += bytes;
check_overflow(ptr <= end);
}
}
else if (column_is_bit(map->column_types[i]))
{
uint64_t value = 0;
uint8_t len = metadata[metadata_offset + 1];
uint8_t bit_len = metadata[metadata_offset] > 0 ? 1 : 0;
size_t bytes = len + bit_len;
@ -664,7 +786,7 @@ uint8_t* process_row_event_data(TableMapEvent *map, TableCreateEvent *create, av
warn_bit = true;
MXS_WARNING("BIT is not currently supported, values are stored as 0.");
}
avro_value_set_int(&field, value);
conv->column(i, 0);
sprintf(trace[i], "[%ld] BIT", i);
ptr += bytes;
check_overflow(ptr <= end);
@ -673,7 +795,7 @@ uint8_t* process_row_event_data(TableMapEvent *map, TableCreateEvent *create, av
{
double f_value = 0.0;
ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value);
avro_value_set_double(&field, f_value);
conv->column(i, f_value);
sprintf(trace[i], "[%ld] DECIMAL", i);
check_overflow(ptr <= end);
}
@ -697,7 +819,7 @@ uint8_t* process_row_event_data(TableMapEvent *map, TableCreateEvent *create, av
memcpy(buf, ptr, sz);
buf[sz] = '\0';
ptr += sz;
avro_value_set_string(&field, buf);
conv->column(i, buf);
check_overflow(ptr <= end);
}
else if (column_is_blob(map->column_types[i]))
@ -709,13 +831,13 @@ uint8_t* process_row_event_data(TableMapEvent *map, TableCreateEvent *create, av
sprintf(trace[i], "[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len);
if (len)
{
avro_value_set_bytes(&field, ptr, len);
conv->column(i, ptr, len);
ptr += len;
}
else
{
uint8_t nullvalue = 0;
avro_value_set_bytes(&field, &nullvalue, 1);
conv->column(i, &nullvalue, 1);
}
check_overflow(ptr <= end);
}
@ -727,7 +849,7 @@ uint8_t* process_row_event_data(TableMapEvent *map, TableCreateEvent *create, av
&metadata[metadata_offset],
create->columns[i].length, &tm);
format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm);
avro_value_set_string(&field, buf);
conv->column(i, buf);
sprintf(trace[i], "[%ld] %s: %s", i, column_type_to_string(map->column_types[i]), buf);
check_overflow(ptr <= end);
}
@ -738,7 +860,7 @@ uint8_t* process_row_event_data(TableMapEvent *map, TableCreateEvent *create, av
memset(lval, 0, sizeof(lval));
ptr += unpack_numeric_field(ptr, map->column_types[i],
&metadata[metadata_offset], lval);
set_numeric_field_value(&field, map->column_types[i], &metadata[metadata_offset], lval);
set_numeric_field_value(conv, i, map->column_types[i], &metadata[metadata_offset], lval);
sprintf(trace[i], "[%ld] %s", i, column_type_to_string(map->column_types[i]));
check_overflow(ptr <= end);
}