250 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			250 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
/*
 | 
						|
 * Copyright (c) 2016 MariaDB Corporation Ab
 | 
						|
 *
 | 
						|
 * Use of this software is governed by the Business Source License included
 | 
						|
 * in the LICENSE.TXT file and at www.mariadb.com/bsl.
 | 
						|
 *
 | 
						|
 * Change Date: 2019-07-01
 | 
						|
 *
 | 
						|
 * On the date above, in accordance with the Business Source License, use
 | 
						|
 * of this software will be governed by version 2 or later of the General
 | 
						|
 * Public License.
 | 
						|
 */
 | 
						|
 | 
						|
/**
 | 
						|
 * @file avro_index.c - GTID to file position index
 | 
						|
 *
 | 
						|
 * This file contains functions used to store index information
 | 
						|
 * about GTID position in an Avro file. Since all records in the Avro file
 | 
						|
 * that avrorouter uses contain the common GTID field, we can use it to create
 | 
						|
 * an index. This can then be used to speed up retrieval of Avro records by
 | 
						|
 * seeking to the offset of the file and reading the record instead of iterating
 | 
						|
 * through all the records and looking for a matching record.
 | 
						|
 *
 | 
						|
 * The index is stored as an SQLite3 database.
 | 
						|
 *
 | 
						|
 * @verbatim
 | 
						|
 * Revision History
 | 
						|
 *
 | 
						|
 * Date         Who             Description
 | 
						|
 * 2/04/2016   Markus Mäkelä   Initial implementation
 | 
						|
 *
 | 
						|
 * @endverbatim
 | 
						|
 */
 | 
						|
 | 
						|
#include "avrorouter.h"
 | 
						|
#include <maxscale/debug.h>
 | 
						|
#include <glob.h>
 | 
						|
 | 
						|
void* safe_key_free(void *data);
 | 
						|
 | 
						|
static const char insert_template[] = "INSERT INTO gtid(domain, server_id, "
 | 
						|
                                      "sequence, avrofile, position) values (%lu, %lu, %lu, \"%s\", %ld);";
 | 
						|
 | 
						|
static void set_gtid(gtid_pos_t *gtid, json_t *row)
 | 
						|
{
 | 
						|
    json_t *obj = json_object_get(row, avro_sequence);
 | 
						|
    ss_dassert(json_is_integer(obj));
 | 
						|
    gtid->seq = json_integer_value(obj);
 | 
						|
 | 
						|
    obj = json_object_get(row, avro_server_id);
 | 
						|
    ss_dassert(json_is_integer(obj));
 | 
						|
    gtid->server_id = json_integer_value(obj);
 | 
						|
 | 
						|
    obj = json_object_get(row, avro_domain);
 | 
						|
    ss_dassert(json_is_integer(obj));
 | 
						|
    gtid->domain = json_integer_value(obj);
 | 
						|
}
 | 
						|
 | 
						|
int index_query_cb(void *data, int rows, char** values, char** names)
 | 
						|
{
 | 
						|
    for (int i = 0; i < rows; i++)
 | 
						|
    {
 | 
						|
        if (values[i])
 | 
						|
        {
 | 
						|
            *((long*)data) = strtol(values[i], NULL, 10);
 | 
						|
            return 0;
 | 
						|
        }
 | 
						|
    }
 | 
						|
    return 0;
 | 
						|
}
 | 
						|
 | 
						|
void avro_index_file(AVRO_INSTANCE *router, const char* filename)
 | 
						|
{
 | 
						|
    MAXAVRO_FILE *file = maxavro_file_open(filename);
 | 
						|
    if (file)
 | 
						|
    {
 | 
						|
        char *name = strrchr(filename, '/');
 | 
						|
        ss_dassert(name);
 | 
						|
 | 
						|
        if (name)
 | 
						|
        {
 | 
						|
            char sql[AVRO_SQL_BUFFER_SIZE];
 | 
						|
            char *errmsg;
 | 
						|
            long pos = -1;
 | 
						|
            name++;
 | 
						|
 | 
						|
            snprintf(sql, sizeof(sql), "SELECT position FROM "INDEX_TABLE_NAME
 | 
						|
                     " WHERE filename=\"%s\";", name);
 | 
						|
            if (sqlite3_exec(router->sqlite_handle, sql, index_query_cb, &pos, &errmsg) != SQLITE_OK)
 | 
						|
            {
 | 
						|
                MXS_ERROR("Failed to read last indexed position of file '%s': %s",
 | 
						|
                          name, errmsg);
 | 
						|
            }
 | 
						|
            else if (pos > 0)
 | 
						|
            {
 | 
						|
                /** Continue from last position */
 | 
						|
                maxavro_record_set_pos(file, pos);
 | 
						|
            }
 | 
						|
            sqlite3_free(errmsg);
 | 
						|
            errmsg = NULL;
 | 
						|
 | 
						|
            gtid_pos_t prev_gtid = {0, 0, 0, 0, 0};
 | 
						|
 | 
						|
            if (sqlite3_exec(router->sqlite_handle, "BEGIN", NULL, NULL, &errmsg) != SQLITE_OK)
 | 
						|
            {
 | 
						|
                MXS_ERROR("Failed to start transaction: %s", errmsg);
 | 
						|
            }
 | 
						|
            sqlite3_free(errmsg);
 | 
						|
 | 
						|
            do
 | 
						|
            {
 | 
						|
                json_t *row = maxavro_record_read_json(file);
 | 
						|
 | 
						|
                if (row)
 | 
						|
                {
 | 
						|
                    gtid_pos_t gtid;
 | 
						|
                    set_gtid(>id, row);
 | 
						|
 | 
						|
                    if (prev_gtid.domain != gtid.domain ||
 | 
						|
                        prev_gtid.server_id != gtid.server_id ||
 | 
						|
                        prev_gtid.seq != gtid.seq)
 | 
						|
                    {
 | 
						|
                        snprintf(sql, sizeof(sql), insert_template, gtid.domain,
 | 
						|
                                 gtid.server_id, gtid.seq, name, file->block_start_pos);
 | 
						|
                        if (sqlite3_exec(router->sqlite_handle, sql, NULL, NULL,
 | 
						|
                                         &errmsg) != SQLITE_OK)
 | 
						|
                        {
 | 
						|
                            MXS_ERROR("Failed to insert GTID %lu-%lu-%lu for %s "
 | 
						|
                                      "into index database: %s", gtid.domain,
 | 
						|
                                      gtid.server_id, gtid.seq, name, errmsg);
 | 
						|
 | 
						|
                        }
 | 
						|
                        sqlite3_free(errmsg);
 | 
						|
                        errmsg = NULL;
 | 
						|
                        prev_gtid = gtid;
 | 
						|
                    }
 | 
						|
                }
 | 
						|
                else
 | 
						|
                {
 | 
						|
                    break;
 | 
						|
                }
 | 
						|
            }
 | 
						|
            while (maxavro_next_block(file));
 | 
						|
 | 
						|
            if (sqlite3_exec(router->sqlite_handle, "COMMIT", NULL, NULL, &errmsg) != SQLITE_OK)
 | 
						|
            {
 | 
						|
                MXS_ERROR("Failed to commit transaction: %s", errmsg);
 | 
						|
            }
 | 
						|
            sqlite3_free(errmsg);
 | 
						|
 | 
						|
            snprintf(sql, sizeof(sql), "INSERT OR REPLACE INTO "INDEX_TABLE_NAME
 | 
						|
                     " values (%lu, \"%s\");", file->block_start_pos, name);
 | 
						|
            if (sqlite3_exec(router->sqlite_handle, sql, NULL, NULL,
 | 
						|
                             &errmsg) != SQLITE_OK)
 | 
						|
            {
 | 
						|
                MXS_ERROR("Failed to update indexing progress: %s", errmsg);
 | 
						|
            }
 | 
						|
            sqlite3_free(errmsg);
 | 
						|
            errmsg = NULL;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            MXS_ERROR("Malformed filename: %s", filename);
 | 
						|
        }
 | 
						|
 | 
						|
        maxavro_file_close(file);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Avro file indexing task
 | 
						|
 *
 | 
						|
 * Builds an index of filenames, GTIDs and positions in the Avro file.
 | 
						|
 * This allows all tables that contain a GTID to be fetched in an effiecent
 | 
						|
 * manner.
 | 
						|
 * @param data The router instance
 | 
						|
 */
 | 
						|
void avro_update_index(AVRO_INSTANCE* router)
 | 
						|
{
 | 
						|
    char path[PATH_MAX + 1];
 | 
						|
    snprintf(path, sizeof(path), "%s/*.avro", router->avrodir);
 | 
						|
    glob_t files;
 | 
						|
 | 
						|
    if (glob(path, 0, NULL, &files) != GLOB_NOMATCH)
 | 
						|
    {
 | 
						|
        for (int i = 0; i < files.gl_pathc; i++)
 | 
						|
        {
 | 
						|
            avro_index_file(router, files.gl_pathv[i]);
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    globfree(&files);
 | 
						|
}
 | 
						|
 | 
						|
/** The SQL for the in-memory used_tables table */
 | 
						|
static const char *insert_sql = "INSERT OR IGNORE INTO "MEMORY_TABLE_NAME
 | 
						|
                                "(domain, server_id, sequence, binlog_timestamp, table_name)"
 | 
						|
                                " VALUES (%lu, %lu, %lu, %lu, \"%s\")";
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Add a used table to the current transaction
 | 
						|
 *
 | 
						|
 * This adds a table to the in-memory table used to store tables used by
 | 
						|
 * transactions. These are later flushed to disk with the Avro records.
 | 
						|
 *
 | 
						|
 * @param router Avro router instance
 | 
						|
 * @param table Table to add
 | 
						|
 */
 | 
						|
void add_used_table(AVRO_INSTANCE* router, char* table)
 | 
						|
{
 | 
						|
    char sql[AVRO_SQL_BUFFER_SIZE], *errmsg;
 | 
						|
    snprintf(sql, sizeof(sql), insert_sql, router->gtid.domain, router->gtid.server_id,
 | 
						|
             router->gtid.seq, router->gtid.timestamp, table);
 | 
						|
 | 
						|
    if (sqlite3_exec(router->sqlite_handle, sql, NULL, NULL, &errmsg) != SQLITE_OK)
 | 
						|
    {
 | 
						|
        MXS_ERROR("Failed to add used table %s for GTID %lu-%lu-%lu: %s",
 | 
						|
                  table, router->gtid.domain, router->gtid.server_id,
 | 
						|
                  router->gtid.seq, errmsg);
 | 
						|
    }
 | 
						|
    sqlite3_free(errmsg);
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Update the tables used in a transaction
 | 
						|
 *
 | 
						|
 * This flushes the in-memory table to disk and should be called after the
 | 
						|
 * Avro records have been flushed to disk.
 | 
						|
 *
 | 
						|
 * @param router Avro router instance
 | 
						|
 */
 | 
						|
void update_used_tables(AVRO_INSTANCE* router)
 | 
						|
{
 | 
						|
    char *errmsg;
 | 
						|
 | 
						|
    if (sqlite3_exec(router->sqlite_handle, "INSERT INTO "USED_TABLES_TABLE_NAME
 | 
						|
                     " SELECT * FROM "MEMORY_TABLE_NAME, NULL, NULL, &errmsg) != SQLITE_OK)
 | 
						|
    {
 | 
						|
        MXS_ERROR("Failed to transfer used table data from memory to disk: %s", errmsg);
 | 
						|
    }
 | 
						|
    sqlite3_free(errmsg);
 | 
						|
 | 
						|
    if (sqlite3_exec(router->sqlite_handle, "DELETE FROM "MEMORY_TABLE_NAME,
 | 
						|
                     NULL, NULL, &errmsg) != SQLITE_OK)
 | 
						|
    {
 | 
						|
        MXS_ERROR("Failed to transfer used table data from memory to disk: %s", errmsg);
 | 
						|
    }
 | 
						|
    sqlite3_free(errmsg);
 | 
						|
}
 |