257 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			257 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| /*
 | |
|  * Copyright (c) 2016 MariaDB Corporation Ab
 | |
|  *
 | |
|  * Use of this software is governed by the Business Source License included
 | |
|  * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | |
|  *
 | |
|  * Change Date: 2020-01-01
 | |
|  *
 | |
|  * On the date above, in accordance with the Business Source License, use
 | |
|  * of this software will be governed by version 2 or later of the General
 | |
|  * Public License.
 | |
|  */
 | |
| 
 | |
| /**
 | |
|  * @file avro_index.c - GTID to file position index
 | |
|  *
 | |
|  * This file contains functions used to store index information
 | |
|  * about GTID position in an Avro file. Since all records in the Avro file
 | |
|  * that avrorouter uses contain the common GTID field, we can use it to create
 | |
|  * an index. This can then be used to speed up retrieval of Avro records by
 | |
|  * seeking to the offset of the file and reading the record instead of iterating
 | |
|  * through all the records and looking for a matching record.
 | |
|  *
 | |
|  * The index is stored as an SQLite3 database.
 | |
|  *
 | |
|  * @verbatim
 | |
|  * Revision History
 | |
|  *
 | |
|  * Date         Who             Description
 | |
|  * 2/04/2016   Markus Mäkelä   Initial implementation
 | |
|  *
 | |
|  * @endverbatim
 | |
|  */
 | |
| 
 | |
| #include "avrorouter.h"
 | |
| 
 | |
| #include <maxscale/debug.h>
 | |
| #include <glob.h>
 | |
| 
 | |
| void* safe_key_free(void *data);
 | |
| 
 | |
| static const char insert_template[] = "INSERT INTO gtid(domain, server_id, "
 | |
|                                       "sequence, avrofile, position) values (%lu, %lu, %lu, \"%s\", %ld);";
 | |
| 
 | |
| static void set_gtid(gtid_pos_t *gtid, json_t *row)
 | |
| {
 | |
|     json_t *obj = json_object_get(row, avro_sequence);
 | |
|     ss_dassert(json_is_integer(obj));
 | |
|     gtid->seq = json_integer_value(obj);
 | |
| 
 | |
|     obj = json_object_get(row, avro_server_id);
 | |
|     ss_dassert(json_is_integer(obj));
 | |
|     gtid->server_id = json_integer_value(obj);
 | |
| 
 | |
|     obj = json_object_get(row, avro_domain);
 | |
|     ss_dassert(json_is_integer(obj));
 | |
|     gtid->domain = json_integer_value(obj);
 | |
| }
 | |
| 
 | |
| int index_query_cb(void *data, int rows, char** values, char** names)
 | |
| {
 | |
|     for (int i = 0; i < rows; i++)
 | |
|     {
 | |
|         if (values[i])
 | |
|         {
 | |
|             *((long*)data) = strtol(values[i], NULL, 10);
 | |
|             return 0;
 | |
|         }
 | |
|     }
 | |
|     return 0;
 | |
| }
 | |
| 
 | |
| void avro_index_file(AVRO_INSTANCE *router, const char* filename)
 | |
| {
 | |
|     MAXAVRO_FILE *file = maxavro_file_open(filename);
 | |
| 
 | |
|     if (file)
 | |
|     {
 | |
|         char *name = strrchr(filename, '/');
 | |
|         ss_dassert(name);
 | |
| 
 | |
|         if (name)
 | |
|         {
 | |
|             char sql[AVRO_SQL_BUFFER_SIZE];
 | |
|             char *errmsg;
 | |
|             long pos = -1;
 | |
|             name++;
 | |
| 
 | |
|             snprintf(sql, sizeof(sql), "SELECT position FROM "INDEX_TABLE_NAME
 | |
|                      " WHERE filename=\"%s\";", name);
 | |
| 
 | |
|             if (sqlite3_exec(router->sqlite_handle, sql, index_query_cb, &pos, &errmsg) != SQLITE_OK)
 | |
|             {
 | |
|                 MXS_ERROR("Failed to read last indexed position of file '%s': %s",
 | |
|                           name, errmsg);
 | |
|                 sqlite3_free(errmsg);
 | |
|                 maxavro_file_close(file);
 | |
|                 return;
 | |
|             }
 | |
| 
 | |
|             /** Continue from last position */
 | |
|             if (pos > 0 && !maxavro_record_set_pos(file, pos))
 | |
|             {
 | |
|                 maxavro_file_close(file);
 | |
|                 return;
 | |
|             }
 | |
| 
 | |
|             gtid_pos_t prev_gtid = {0, 0, 0, 0, 0};
 | |
| 
 | |
|             if (sqlite3_exec(router->sqlite_handle, "BEGIN", NULL, NULL, &errmsg) != SQLITE_OK)
 | |
|             {
 | |
|                 MXS_ERROR("Failed to start transaction: %s", errmsg);
 | |
|             }
 | |
|             sqlite3_free(errmsg);
 | |
| 
 | |
|             do
 | |
|             {
 | |
|                 json_t *row = maxavro_record_read_json(file);
 | |
| 
 | |
|                 if (row)
 | |
|                 {
 | |
|                     gtid_pos_t gtid;
 | |
|                     set_gtid(>id, row);
 | |
| 
 | |
|                     if (prev_gtid.domain != gtid.domain ||
 | |
|                         prev_gtid.server_id != gtid.server_id ||
 | |
|                         prev_gtid.seq != gtid.seq)
 | |
|                     {
 | |
|                         snprintf(sql, sizeof(sql), insert_template, gtid.domain,
 | |
|                                  gtid.server_id, gtid.seq, name, file->block_start_pos);
 | |
|                         if (sqlite3_exec(router->sqlite_handle, sql, NULL, NULL,
 | |
|                                          &errmsg) != SQLITE_OK)
 | |
|                         {
 | |
|                             MXS_ERROR("Failed to insert GTID %lu-%lu-%lu for %s "
 | |
|                                       "into index database: %s", gtid.domain,
 | |
|                                       gtid.server_id, gtid.seq, name, errmsg);
 | |
| 
 | |
|                         }
 | |
|                         sqlite3_free(errmsg);
 | |
|                         errmsg = NULL;
 | |
|                         prev_gtid = gtid;
 | |
|                     }
 | |
|                     json_decref(row);
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     break;
 | |
|                 }
 | |
|             }
 | |
|             while (maxavro_next_block(file));
 | |
| 
 | |
|             if (sqlite3_exec(router->sqlite_handle, "COMMIT", NULL, NULL, &errmsg) != SQLITE_OK)
 | |
|             {
 | |
|                 MXS_ERROR("Failed to commit transaction: %s", errmsg);
 | |
|             }
 | |
|             sqlite3_free(errmsg);
 | |
| 
 | |
|             snprintf(sql, sizeof(sql), "INSERT OR REPLACE INTO "INDEX_TABLE_NAME
 | |
|                      " values (%lu, \"%s\");", file->block_start_pos, name);
 | |
|             if (sqlite3_exec(router->sqlite_handle, sql, NULL, NULL,
 | |
|                              &errmsg) != SQLITE_OK)
 | |
|             {
 | |
|                 MXS_ERROR("Failed to update indexing progress: %s", errmsg);
 | |
|             }
 | |
|             sqlite3_free(errmsg);
 | |
|             errmsg = NULL;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             MXS_ERROR("Malformed filename: %s", filename);
 | |
|         }
 | |
| 
 | |
|         maxavro_file_close(file);
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Avro file indexing task
 | |
|  *
 | |
|  * Builds an index of filenames, GTIDs and positions in the Avro file.
 | |
|  * This allows all tables that contain a GTID to be fetched in an effiecent
 | |
|  * manner.
 | |
|  * @param data The router instance
 | |
|  */
 | |
| void avro_update_index(AVRO_INSTANCE* router)
 | |
| {
 | |
|     char path[PATH_MAX + 1];
 | |
|     snprintf(path, sizeof(path), "%s/*.avro", router->avrodir);
 | |
|     glob_t files;
 | |
| 
 | |
|     if (glob(path, 0, NULL, &files) != GLOB_NOMATCH)
 | |
|     {
 | |
|         for (int i = 0; i < files.gl_pathc; i++)
 | |
|         {
 | |
|             avro_index_file(router, files.gl_pathv[i]);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     globfree(&files);
 | |
| }
 | |
| 
 | |
| /** The SQL for the in-memory used_tables table */
 | |
| static const char *insert_sql = "INSERT OR IGNORE INTO "MEMORY_TABLE_NAME
 | |
|                                 "(domain, server_id, sequence, binlog_timestamp, table_name)"
 | |
|                                 " VALUES (%lu, %lu, %lu, %u, \"%s\")";
 | |
| 
 | |
| /**
 | |
|  * @brief Add a used table to the current transaction
 | |
|  *
 | |
|  * This adds a table to the in-memory table used to store tables used by
 | |
|  * transactions. These are later flushed to disk with the Avro records.
 | |
|  *
 | |
|  * @param router Avro router instance
 | |
|  * @param table Table to add
 | |
|  */
 | |
| void add_used_table(AVRO_INSTANCE* router, char* table)
 | |
| {
 | |
|     char sql[AVRO_SQL_BUFFER_SIZE], *errmsg;
 | |
|     snprintf(sql, sizeof(sql), insert_sql, router->gtid.domain, router->gtid.server_id,
 | |
|              router->gtid.seq, router->gtid.timestamp, table);
 | |
| 
 | |
|     if (sqlite3_exec(router->sqlite_handle, sql, NULL, NULL, &errmsg) != SQLITE_OK)
 | |
|     {
 | |
|         MXS_ERROR("Failed to add used table %s for GTID %lu-%lu-%lu: %s",
 | |
|                   table, router->gtid.domain, router->gtid.server_id,
 | |
|                   router->gtid.seq, errmsg);
 | |
|     }
 | |
|     sqlite3_free(errmsg);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Update the tables used in a transaction
 | |
|  *
 | |
|  * This flushes the in-memory table to disk and should be called after the
 | |
|  * Avro records have been flushed to disk.
 | |
|  *
 | |
|  * @param router Avro router instance
 | |
|  */
 | |
| void update_used_tables(AVRO_INSTANCE* router)
 | |
| {
 | |
|     char *errmsg;
 | |
| 
 | |
|     if (sqlite3_exec(router->sqlite_handle, "INSERT INTO "USED_TABLES_TABLE_NAME
 | |
|                      " SELECT * FROM "MEMORY_TABLE_NAME, NULL, NULL, &errmsg) != SQLITE_OK)
 | |
|     {
 | |
|         MXS_ERROR("Failed to transfer used table data from memory to disk: %s", errmsg);
 | |
|     }
 | |
|     sqlite3_free(errmsg);
 | |
| 
 | |
|     if (sqlite3_exec(router->sqlite_handle, "DELETE FROM "MEMORY_TABLE_NAME,
 | |
|                      NULL, NULL, &errmsg) != SQLITE_OK)
 | |
|     {
 | |
|         MXS_ERROR("Failed to transfer used table data from memory to disk: %s", errmsg);
 | |
|     }
 | |
|     sqlite3_free(errmsg);
 | |
| }
 | 
