MXS-1881: Remove file indexing

The file indexing provided very little benefit for the intended purpose of
the router. Removing it makes the whole system more robust and simplifies
the code by a large amount.
This commit is contained in:
Markus Mäkelä 2018-06-08 12:43:28 +03:00
parent 037498675f
commit 8f76cf5f5a
No known key found for this signature in database
GPG Key ID: 72D48FCE664F7B19
7 changed files with 12 additions and 372 deletions

View File

@ -4,7 +4,7 @@ if(AVRO_FOUND AND JANSSON_FOUND)
# The common avrorouter functionality
add_library(avro-common SHARED avro.cc ../binlogrouter/binlog_common.cc avro_client.cc
avro_schema.cc avro_rbr.cc avro_file.cc avro_index.cc avro_converter.cc)
avro_schema.cc avro_rbr.cc avro_file.cc avro_converter.cc)
set_target_properties(avro-common PROPERTIES VERSION "1.0.0" LINK_FLAGS -Wl,-z,defs)
target_link_libraries(avro-common maxscale-common ${JANSSON_LIBRARIES} ${AVRO_LIBRARIES} maxavro lzma)
install_module(avro-common core)

View File

@ -27,7 +27,6 @@
#include <time.h>
#include <glob.h>
#include <ini.h>
#include <sys/stat.h>
#include <avro/errors.h>
#include <maxscale/alloc.h>
#include <maxscale/atomic.h>
@ -50,57 +49,6 @@
using namespace maxscale;
/**
* Create the required tables in the sqlite database
*
* @param handle SQLite handle
* @return True on success, false on error
*/
bool create_tables(sqlite3* handle)
{
char* errmsg;
int rc = sqlite3_exec(handle, "CREATE TABLE IF NOT EXISTS "
GTID_TABLE_NAME"(domain int, server_id int, "
"sequence bigint, "
"avrofile varchar(255), "
"position bigint, "
"primary key(domain, server_id, sequence, avrofile));",
NULL, NULL, &errmsg);
if (rc != SQLITE_OK)
{
MXS_ERROR("Failed to create GTID index table '" GTID_TABLE_NAME "': %s",
sqlite3_errmsg(handle));
sqlite3_free(errmsg);
return false;
}
rc = sqlite3_exec(handle, "CREATE TABLE IF NOT EXISTS "
USED_TABLES_TABLE_NAME"(domain int, server_id int, "
"sequence bigint, binlog_timestamp bigint, "
"table_name varchar(255));",
NULL, NULL, &errmsg);
if (rc != SQLITE_OK)
{
MXS_ERROR("Failed to create used tables table '" USED_TABLES_TABLE_NAME "': %s",
sqlite3_errmsg(handle));
sqlite3_free(errmsg);
return false;
}
rc = sqlite3_exec(handle, "CREATE TABLE IF NOT EXISTS "
INDEX_TABLE_NAME"(position bigint, filename varchar(255));",
NULL, NULL, &errmsg);
if (rc != SQLITE_OK)
{
MXS_ERROR("Failed to create indexing progress table '" INDEX_TABLE_NAME "': %s",
sqlite3_errmsg(handle));
sqlite3_free(errmsg);
return false;
}
return true;
}
/**
* @brief Read router options from an external binlogrouter service
*
@ -158,7 +106,6 @@ void Avro::read_source_service_options(SERVICE* source)
//static
Avro* Avro::create(SERVICE* service)
{
bool err = false;
SERVICE* source_service = NULL;
MXS_CONFIG_PARAMETER *param = config_get_param(service->svc_config_param, "source");
@ -178,43 +125,20 @@ Avro* Avro::create(SERVICE* service)
{
MXS_ERROR("Service '%s' uses router module '%s' instead of "
"'binlogrouter'.", source->name, source->routerModule);
err = true;
return NULL;
}
}
else
{
MXS_ERROR("Service '%s' not found.", param->value);
err = true;
return NULL;
}
}
const int flags = SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
sqlite3* sqlite_handle;
const char* avrodir = config_get_string(service->svc_config_param, "avrodir");
char dbpath[PATH_MAX + 1];
snprintf(dbpath, sizeof(dbpath), "/%s/%s", avrodir, avro_index_name);
if (sqlite3_open_v2(dbpath, &sqlite_handle, flags, NULL) != SQLITE_OK)
{
MXS_ERROR("Failed to open SQLite database '%s': %s", dbpath, sqlite3_errmsg(sqlite_handle));
err = true;
}
else if (!create_tables(sqlite_handle))
{
err = true;
}
if (err)
{
sqlite3_close_v2(sqlite_handle);
return NULL;
}
return new (std::nothrow) Avro(service, service->svc_config_param,
sqlite_handle, source_service);
return new (std::nothrow) Avro(service, service->svc_config_param, source_service);
}
Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, sqlite3* handle, SERVICE* source):
Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source):
service(service),
filestem(config_get_string(params, "filestem")),
binlogdir(config_get_string(params, "binlogdir")),
@ -228,7 +152,6 @@ Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, sqlite3* handle, SERV
trx_target(config_get_integer(params, "group_trx")),
row_count(0),
row_target(config_get_integer(params, "group_rows")),
sqlite_handle(handle),
task_handle(0),
stats{0}
{

View File

@ -467,39 +467,6 @@ static int sqlite_cb(void* data, int rows, char** values, char** names)
return 0;
}
static const char select_template[] = "SELECT max(position) FROM gtid WHERE domain=%lu "
"AND server_id=%lu AND sequence <= %lu AND avrofile=\"%s\";";
bool AvroSession::seek_to_index_pos()
{
char *name = strrchr(file_handle->filename, '/');
ss_dassert(name);
name++;
char sql[sizeof(select_template) + NAME_MAX + 80];
snprintf(sql, sizeof(sql), select_template, gtid.domain, gtid.server_id, gtid.seq, name);
long offset = -1;
char *errmsg = NULL;
bool rval = false;
if (sqlite3_exec(sqlite_handle, sql, sqlite_cb, &offset, &errmsg) == SQLITE_OK)
{
rval = true;
if (offset > 0 && !maxavro_record_set_pos(file_handle, offset))
{
rval = false;
}
}
else
{
MXS_ERROR("Failed to query index position for GTID %lu-%lu-%lu: %s",
gtid.domain, gtid.server_id, gtid.seq, errmsg);
}
sqlite3_free(errmsg);
return rval;
}
bool AvroSession::seek_to_gtid()
{
bool seeking = true;
@ -575,7 +542,7 @@ bool AvroSession::stream_data()
{
case AVRO_FORMAT_JSON:
/** Currently only JSON format supports seeking to a GTID */
if (requested_gtid && seek_to_index_pos() && seek_to_gtid())
if (requested_gtid && seek_to_gtid())
{
requested_gtid = false;
}
@ -759,27 +726,10 @@ void AvroSession::client_callback()
// static
AvroSession* AvroSession::create(Avro* inst, MXS_SESSION* session)
{
AvroSession* client = NULL;
sqlite3* handle;
char dbpath[PATH_MAX + 1];
snprintf(dbpath, sizeof(dbpath), "/%s/%s", inst->avrodir.c_str(), avro_index_name);
if (sqlite3_open_v2(dbpath, &handle,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL) != SQLITE_OK)
{
MXS_ERROR("Failed to open SQLite database '%s': %s", dbpath, sqlite3_errmsg(handle));
sqlite3_close_v2(handle);
}
else if ((client = new (std::nothrow) AvroSession(inst, session, handle)) == NULL)
{
MXS_OOM();
sqlite3_close_v2(handle);
}
return client;
return new (std::nothrow) AvroSession(inst, session);
}
AvroSession::AvroSession(Avro* instance, MXS_SESSION* session, sqlite3* handle):
AvroSession::AvroSession(Avro* instance, MXS_SESSION* session):
dcb(session->client_dcb),
state(AVRO_CLIENT_UNREGISTERED),
format(AVRO_FORMAT_UNDEFINED),
@ -788,13 +738,11 @@ AvroSession::AvroSession(Avro* instance, MXS_SESSION* session, sqlite3* handle):
file_handle(NULL),
last_sent_pos(0),
connect_time(time(NULL)),
requested_gtid(false),
sqlite_handle(handle)
requested_gtid(false)
{
}
AvroSession::~AvroSession()
{
maxavro_file_close(file_handle);
sqlite3_close_v2(sqlite_handle);
}

View File

@ -38,7 +38,7 @@
#include <maxscale/utils.h>
static const char *statefile_section = "avro-conversion";
static const char *ddl_list_name = "table-ddl.list";
void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
/**

View File

@ -1,194 +0,0 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
/**
* @file avro_index.c - GTID to file position index
*
* This file contains functions used to store index information
* about GTID position in an Avro file. Since all records in the Avro file
* that avrorouter uses contain the common GTID field, we can use it to create
* an index. This can then be used to speed up retrieval of Avro records by
* seeking to the offset of the file and reading the record instead of iterating
* through all the records and looking for a matching record.
*
* The index is stored as an SQLite3 database.
*/
#include "avrorouter.hh"
#include <maxscale/debug.h>
#include <glob.h>
static const char insert_template[] = "INSERT INTO gtid(domain, server_id, "
"sequence, avrofile, position) values (%lu, %lu, %lu, \"%s\", %ld);";
static void set_gtid(gtid_pos_t *gtid, json_t *row)
{
json_t *obj = json_object_get(row, avro_sequence);
ss_dassert(json_is_integer(obj));
gtid->seq = json_integer_value(obj);
obj = json_object_get(row, avro_server_id);
ss_dassert(json_is_integer(obj));
gtid->server_id = json_integer_value(obj);
obj = json_object_get(row, avro_domain);
ss_dassert(json_is_integer(obj));
gtid->domain = json_integer_value(obj);
}
int index_query_cb(void *data, int rows, char** values, char** names)
{
for (int i = 0; i < rows; i++)
{
if (values[i])
{
*((long*)data) = strtol(values[i], NULL, 10);
return 0;
}
}
return 0;
}
void avro_index_file(Avro *router, const char* filename)
{
MAXAVRO_FILE *file = maxavro_file_open(filename);
if (file)
{
const char *name = strrchr(filename, '/');
ss_dassert(name);
if (name)
{
char sql[AVRO_SQL_BUFFER_SIZE];
char *errmsg;
long pos = -1;
name++;
snprintf(sql, sizeof(sql), "SELECT position FROM " INDEX_TABLE_NAME
" WHERE filename=\"%s\";", name);
if (sqlite3_exec(router->sqlite_handle, sql, index_query_cb, &pos, &errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to read last indexed position of file '%s': %s",
name, errmsg);
sqlite3_free(errmsg);
maxavro_file_close(file);
return;
}
/** Continue from last position */
if (pos > 0 && !maxavro_record_set_pos(file, pos))
{
maxavro_file_close(file);
return;
}
gtid_pos_t prev_gtid;
if (sqlite3_exec(router->sqlite_handle, "BEGIN", NULL, NULL, &errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to start transaction: %s", errmsg);
}
sqlite3_free(errmsg);
do
{
json_t *row = maxavro_record_read_json(file);
if (row)
{
gtid_pos_t gtid;
set_gtid(&gtid, row);
if (prev_gtid.domain != gtid.domain ||
prev_gtid.server_id != gtid.server_id ||
prev_gtid.seq != gtid.seq)
{
snprintf(sql, sizeof(sql), insert_template, gtid.domain,
gtid.server_id, gtid.seq, name, file->block_start_pos);
if (sqlite3_exec(router->sqlite_handle, sql, NULL, NULL,
&errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to insert GTID %lu-%lu-%lu for %s "
"into index database: %s", gtid.domain,
gtid.server_id, gtid.seq, name, errmsg);
}
sqlite3_free(errmsg);
errmsg = NULL;
prev_gtid = gtid;
}
json_decref(row);
}
else
{
break;
}
}
while (maxavro_next_block(file));
if (sqlite3_exec(router->sqlite_handle, "COMMIT", NULL, NULL, &errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to commit transaction: %s", errmsg);
}
sqlite3_free(errmsg);
snprintf(sql, sizeof(sql), "INSERT OR REPLACE INTO " INDEX_TABLE_NAME
" values (%lu, \"%s\");", file->block_start_pos, name);
if (sqlite3_exec(router->sqlite_handle, sql, NULL, NULL,
&errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to update indexing progress: %s", errmsg);
}
sqlite3_free(errmsg);
errmsg = NULL;
}
else
{
MXS_ERROR("Malformed filename: %s", filename);
}
maxavro_file_close(file);
}
else
{
MXS_ERROR("Failed to open file '%s' when generating file index.", filename);
}
}
/**
* @brief Avro file indexing task
*
* Builds an index of filenames, GTIDs and positions in the Avro file.
* This allows all tables that contain a GTID to be fetched in an effiecent
* manner.
* @param data The router instance
*/
void avro_update_index(Avro* router)
{
char path[PATH_MAX + 1];
snprintf(path, sizeof(path), "%s/*.avro", router->avrodir.c_str());
glob_t files;
if (glob(path, 0, NULL, &files) != GLOB_NOMATCH)
{
for (size_t i = 0; i < files.gl_pathc; i++)
{
avro_index_file(router, files.gl_pathv[i]);
}
}
globfree(&files);
}

View File

@ -263,8 +263,6 @@ bool converter_func(Worker::Call::action_t action, Avro* router)
if (router->current_pos != start_pos || binlog_name != router->binlog_name)
{
/** Update the GTID index */
avro_update_index(router);
progress = true;
}
@ -419,7 +417,6 @@ static bool avro_handle_purge(const MODULECMD_ARG *args, json_t** output)
// Then delete the files
return do_unlink("%s/%s", inst->avrodir.c_str(), AVRO_PROGRESS_FILE) && // State file
do_unlink("/%s/%s", inst->avrodir.c_str(), avro_index_name) && // Index database
do_unlink_with_pattern("/%s/*.avro", inst->avrodir.c_str()) && // .avro files
do_unlink_with_pattern("/%s/*.avsc", inst->avrodir.c_str()); // .avsc files
}

View File

@ -29,7 +29,6 @@
#include <maxscale/pcre2.h>
#include <maxavro.h>
#include <binlog_common.h>
#include <maxscale/sqlite3.h>
#include <maxscale/protocol/mysql.h>
#include <blr_constants.h>
@ -37,38 +36,9 @@
MXS_BEGIN_DECLS
/**
* How often to call the router status function (seconds)
*/
#define AVRO_NSTATS_MINUTES 30
/**
* Avro block grouping defaults
*/
#define AVRO_DEFAULT_BLOCK_TRX_COUNT 1
#define AVRO_DEFAULT_BLOCK_ROW_COUNT 1000
#define MAX_MAPPED_TABLES 1024
#define GTID_TABLE_NAME "gtid"
#define USED_TABLES_TABLE_NAME "used_tables"
#define INDEX_TABLE_NAME "indexing_progress"
/** Name of the file where the binlog to Avro conversion progress is stored */
#define AVRO_PROGRESS_FILE "avro-conversion.ini"
static const char* avro_index_name = "avro.index";
/** Buffer limits */
#define AVRO_SQL_BUFFER_SIZE 2048
/** Avro filename maxlen */
#ifdef NAME_MAX
#define AVRO_MAX_FILENAME_LEN NAME_MAX
#else
#define AVRO_MAX_FILENAME_LEN 255
#endif
static const char *avro_client_states[] = { "Unregistered", "Registered", "Processing", "Errored" };
static const char *avro_client_client_mode[] = { "Catch-up", "Busy", "Wait_for_data" };
@ -177,7 +147,6 @@ public:
uint64_t row_count; /*< Row events processed */
uint64_t row_target; /*< Minimum about of row events that will trigger
* a flush of all tables */
sqlite3* sqlite_handle;
uint32_t task_handle; /**< Delayed task handle */
RowEventHandler* event_hander;
@ -187,7 +156,7 @@ public:
} stats; /*< Statistics for this router */
private:
Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, sqlite3* handle, SERVICE* source);
Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source);
void read_source_service_options(SERVICE* source);
};
@ -213,7 +182,6 @@ public:
bool requested_gtid; /*< If the client requested */
gtid_pos_t gtid; /*< Current/requested GTID */
gtid_pos_t gtid_start; /*< First sent GTID */
sqlite3* sqlite_handle;
/**
* Process a client request
@ -230,7 +198,7 @@ public:
void client_callback();
private:
AvroSession(Avro* instance, MXS_SESSION* session, sqlite3* handle);
AvroSession(Avro* instance, MXS_SESSION* session);
int do_registration(GWBUF *data);
void process_command(GWBUF *queue);
@ -238,7 +206,6 @@ private:
void set_current_gtid(json_t *row);
bool stream_json();
bool stream_binary();
bool seek_to_index_pos();
bool seek_to_gtid();
bool stream_data();
void rotate_avro_file(std::string fullname);
@ -264,7 +231,6 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos);
REP_HEADER construct_header(uint8_t* ptr);
bool avro_save_conversion_state(Avro *router);
void avro_update_index(Avro* router);
bool avro_load_conversion_state(Avro *router);
void avro_load_metadata_from_schemas(Avro *router);
void notify_all_clients(Avro *router);