From 8c22131827d074d25100f9f7b4cf095a074ca6da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Thu, 7 Jun 2018 21:51:00 +0300 Subject: [PATCH] Remove redundant or unused avrorouter code The code in avrorouter that returned the current transaction was not very useful and it can be acquired via the REST API in a more convenient format. The number of created sessions is tracked on the service level so there is no need to track it in the avrorouter. Removed declarations for functions that do not exist and moved code around to reduce the scope. --- Documentation/Protocols/CDC.md | 26 -- .../MaxScale-2.3.0-Release-Notes.md | 6 + server/modules/routing/avrorouter/avro.cc | 51 +-- .../modules/routing/avrorouter/avro_client.cc | 359 ++++-------------- .../modules/routing/avrorouter/avro_file.cc | 6 - .../modules/routing/avrorouter/avro_index.cc | 58 --- .../modules/routing/avrorouter/avro_main.cc | 8 - server/modules/routing/avrorouter/avro_rbr.cc | 5 - .../modules/routing/avrorouter/avrorouter.hh | 50 +-- 9 files changed, 108 insertions(+), 461 deletions(-) diff --git a/Documentation/Protocols/CDC.md b/Documentation/Protocols/CDC.md index 0eaf13173..5b979ee20 100644 --- a/Documentation/Protocols/CDC.md +++ b/Documentation/Protocols/CDC.md @@ -100,32 +100,6 @@ REQUEST-DATA dbi1.table1.000003 REQUEST-DATA db2.table4 0-11-345 ``` -#### QUERY-LAST-TRANSACTION - -`QUERY-LAST-TRANSACTION` - -Returns JSON with last GTID, timestamp and affected tables. - -Example output: - -``` -{"GTID": "0-1-178", "events": 2, "timestamp": 1462290084, "tables": ["db1.tb1", “db2.tb2”]} -``` - -Last GTID could then be used in a REQUEST-DATA query. - -#### QUERY-TRANSACTION - -`QUERY-TRANSACTION GTID` - -Returns JSON from specified GTID, the commit timestamp and affected tables. - -Example: - -``` -QUERY-TRANSACTION 0-14-1245 -``` - ## Example Client MaxScale includes an example CDC client application written in Python 3. You can diff --git a/Documentation/Release-Notes/MaxScale-2.3.0-Release-Notes.md b/Documentation/Release-Notes/MaxScale-2.3.0-Release-Notes.md index 4f797e8ef..56fe43eee 100644 --- a/Documentation/Release-Notes/MaxScale-2.3.0-Release-Notes.md +++ b/Documentation/Release-Notes/MaxScale-2.3.0-Release-Notes.md @@ -42,6 +42,12 @@ The use of `router_options` with avrorouter was deprecated in MaxScale 2.1. In MaxScale 2.3, the use of `router_options` is no longer supported and the options should be given as parameters instead. +### `QUERY-LAST-TRANSACTION` and `QUERY-TRANSACTION` CDC commands + +The CDC protocol no longer accepts the `QUERY-LAST-TRANSACTION` and +`QUERY-TRANSACTION` commands. They were removed due to the addition of the REST +API that provides the same information in a more easy to process format. + ## New Features ### Runtime Configuration of the Cache diff --git a/server/modules/routing/avrorouter/avro.cc b/server/modules/routing/avrorouter/avro.cc index d15fe1100..e20846150 100644 --- a/server/modules/routing/avrorouter/avro.cc +++ b/server/modules/routing/avrorouter/avro.cc @@ -50,30 +50,6 @@ using namespace maxscale; -#ifndef BINLOG_NAMEFMT -#define BINLOG_NAMEFMT "%s.%06d" -#endif - -/** For detection of CREATE/ALTER TABLE statements */ -static const char* create_table_regex = - "(?i)create[a-z0-9[:space:]_]+table"; -static const char* alter_table_regex = - "(?i)alter[[:space:]]+table"; - -extern void avro_get_used_tables(Avro *router, DCB *dcb); -bool converter_func(Worker::Call::action_t action, Avro* router); -bool binlog_next_file_exists(const char* binlogdir, const char* binlog); -int blr_file_get_next_binlogname(const char *router); -bool avro_load_conversion_state(Avro *router); -void avro_load_metadata_from_schemas(Avro *router); -int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata); -static bool ensure_dir_ok(std::string path, int mode); -static void stats_func(void *); -void avro_index_file(Avro *router, const char* path); - -static SPINLOCK instlock; -static Avro *instances; - /** * Create the required tables in the sqlite database * @@ -122,30 +98,6 @@ bool create_tables(sqlite3* handle) return false; } - rc = sqlite3_exec(handle, "ATTACH DATABASE ':memory:' AS " MEMORY_DATABASE_NAME, - NULL, NULL, &errmsg); - if (rc != SQLITE_OK) - { - MXS_ERROR("Failed to attach in-memory database '" MEMORY_DATABASE_NAME "': %s", - sqlite3_errmsg(handle)); - sqlite3_free(errmsg); - return false; - } - - rc = sqlite3_exec(handle, "CREATE TABLE " MEMORY_TABLE_NAME - "(domain int, server_id int, " - "sequence bigint, binlog_timestamp bigint, " - "table_name varchar(255), primary key (domain, server_id, sequence, table_name));", - NULL, NULL, &errmsg); - if (rc != SQLITE_OK) - { - MXS_ERROR("Failed to create in-memory used tables table '" MEMORY_DATABASE_NAME - "." MEMORY_TABLE_NAME "': %s", - sqlite3_errmsg(handle)); - sqlite3_free(errmsg); - return false; - } - return true; } @@ -286,6 +238,9 @@ Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, sqlite3* handle, SERV // TODO: pass this as a parameter or something event_hander = new AvroConverter(avrodir, block_size, codec); + /** For detection of CREATE/ALTER TABLE statements */ + static const char* create_table_regex = "(?i)create[a-z0-9[:space:]_]+table"; + static const char* alter_table_regex = "(?i)alter[[:space:]]+table"; int pcreerr; size_t erroff; create_table_re = pcre2_compile((PCRE2_SPTR) create_table_regex, PCRE2_ZERO_TERMINATED, diff --git a/server/modules/routing/avrorouter/avro_client.cc b/server/modules/routing/avrorouter/avro_client.cc index d84bae44d..23893adbf 100644 --- a/server/modules/routing/avrorouter/avro_client.cc +++ b/server/modules/routing/avrorouter/avro_client.cc @@ -37,17 +37,16 @@ #include #include -extern char *blr_extract_column(GWBUF *buf, int col); -extern uint32_t extract_field(uint8_t *src, int bits); - -/* AVRO */ -int avro_client_callback(DCB *dcb, DCB_REASON reason, void *data); -void avro_notify_client(AvroSession *client); -void poll_fake_write_event(DCB *dcb); -GWBUF* read_avro_json_schema(const char *avrofile, const char* dir); -GWBUF* read_avro_binary_schema(const char *avrofile, const char* dir); std::pair get_avrofile_and_gtid(std::string file); +enum +{ + AVRO_CLIENT_UNREGISTERED, + AVRO_CLIENT_REGISTERED, + AVRO_CLIENT_REQUEST_DATA, + AVRO_CLIENT_ERRORED, +}; + int AvroSession::routeQuery(GWBUF *queue) { int rval = 1; @@ -220,183 +219,6 @@ void extract_gtid_request(gtid_pos_t *gtid, const char *start, int len) } } -/** - * Callback for GTID retrieval - * @param data User data - * @param ncolumns Number of columns - * @param values Row data - * @param names Field names - * @return 0 on success - */ -int gtid_query_cb(void* data, int ncolumns, char** values, char** names) -{ - json_t *arr = (json_t*)data; - - if (values[0]) - { - json_array_append_new(arr, json_string(values[0])); - } - - return 0; -} - -/** - * Callback for GTID retrieval - * @param data User data - * @param ncolumns Number of columns - * @param values Row data - * @param names Field names - * @return 0 on success - */ -int gtid_query_cb_plain(void* data, int ncolumns, char** values, char** names) -{ - DCB *dcb = (DCB *)data; - if (values[0]) - { - dcb_printf(dcb, "%s ", values[0]); - } - - return 0; -} - -/** - * Add the tables involved in the latest transaction to a JSON object - * - * @param handle SQLite3 handle - * @param obj JSON object to add values to - * @param gtid GTID of the last transaction - */ -void add_used_tables(sqlite3 *handle, json_t* obj, gtid_pos_t* gtid) -{ - char sql[AVRO_SQL_BUFFER_SIZE]; - snprintf(sql, sizeof(sql), "SELECT table_name FROM " USED_TABLES_TABLE_NAME - " WHERE domain = %lu AND server_id = %lu AND sequence = %lu", - gtid->domain, gtid->server_id, gtid->seq); - - char* errmsg; - json_t *arr = json_array(); - - if (sqlite3_exec(handle, sql, gtid_query_cb, arr, - &errmsg) != SQLITE_OK) - { - json_decref(arr); - MXS_ERROR("Failed to execute query: %s", errmsg); - } - else - { - json_object_set_new(obj, "tables", arr); - } - sqlite3_free(errmsg); -} - -/** - * Get the tables involved in the latest transaction. - * - * Sqlite3 callback routine calls dcb_printf() - * - * @param router The AVRO router instance - * @param dcb The dcb to write data - */ -void avro_get_used_tables(Avro *router, DCB* dcb) -{ - sqlite3 *handle = router->sqlite_handle; - char sql[AVRO_SQL_BUFFER_SIZE]; - snprintf(sql, sizeof(sql), "SELECT table_name FROM " USED_TABLES_TABLE_NAME - " WHERE domain = %lu AND server_id = %lu AND sequence = %lu", - router->gtid.domain, router->gtid.server_id, router->gtid.seq); - - char* errmsg; - - /* call dcb_printf via callback */ - if (sqlite3_exec(handle, sql, gtid_query_cb_plain, dcb, - &errmsg) != SQLITE_OK) - { - MXS_ERROR("Failed to execute query: %s", errmsg); - } - sqlite3_free(errmsg); -} - -/** - * Callback for timestamp retrieval - * - * @param data User data - * @param ncolumns Number of columns - * @param values Row data - * @param names Field names - * @return 0 on success - */ -int timestamp_query_cb(void* data, int ncolumns, char** values, char** names) -{ - long *val = (long*)data; - - if (values[0]) - { - *val = strtol(values[0], NULL, 10); - } - - return 0; -} - -/** - * Add the GTID timestamp to a JSON object - * - * @param handle - * @param obj - * @param gtid - */ -void add_timestamp(sqlite3 *handle, json_t* obj, gtid_pos_t* gtid) -{ - char sql[AVRO_SQL_BUFFER_SIZE]; - snprintf(sql, sizeof(sql), "SELECT DISTINCT binlog_timestamp FROM " USED_TABLES_TABLE_NAME - " WHERE domain = %lu AND server_id = %lu AND sequence = %lu", - gtid->domain, gtid->server_id, gtid->seq); - - char* errmsg; - long ts = 0; - - if (sqlite3_exec(handle, sql, timestamp_query_cb, &ts, - &errmsg) == SQLITE_OK) - { - json_object_set_new(obj, "timestamp", json_integer(ts)); - } - else - { - MXS_ERROR("Failed to execute query: %s", errmsg); - } - sqlite3_free(errmsg); - -} - -/** - * Send information about the current GTID being processed - * @param router Router instance - * @param dcb Client DCB - */ -void AvroSession::send_gtid_info(gtid_pos_t* gtid_pos) -{ - json_t *obj = json_object(); - - if (obj) - { - char gtid[256]; - snprintf(gtid, sizeof(gtid), "%lu-%lu-%lu", gtid_pos->domain, - gtid_pos->server_id, gtid_pos->seq); - json_object_set_new(obj, "GTID", json_string(gtid)); - - // TODO: Store number of events in the database - json_object_set_new(obj, "events", json_integer(gtid_pos->event_num)); - - add_timestamp(router->sqlite_handle, obj, gtid_pos); - add_used_tables(router->sqlite_handle, obj, gtid_pos); - - char *js = json_dumps(obj, 0); - size_t size = strlen(js); - GWBUF *buffer = gwbuf_alloc_and_load(size, js); - MXS_FREE(js); - dcb->func.write(dcb, buffer); - } -} - /** * @brief Check if a file exists in a directory * @@ -414,79 +236,22 @@ bool file_in_dir(const char *dir, const char *file) } /** - * Process command from client - * - * @param data Buffer containing the command + * @brief The client callback for sending data * + * @param dcb Client DCB + * @param reason Why the callback was called + * @param userdata Data provided when the callback was added + * @return Always 0 */ -void AvroSession::process_command(GWBUF *queue) +int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata) { - const char req_data[] = "REQUEST-DATA"; - const char req_last_gtid[] = "QUERY-LAST-TRANSACTION"; - const char req_gtid[] = "QUERY-TRANSACTION"; - const size_t req_data_len = sizeof(req_data) - 1; - size_t buflen = gwbuf_length(queue); - uint8_t data[buflen + 1]; - gwbuf_copy_data(queue, 0, buflen, data); - data[buflen] = '\0'; - char *command_ptr = strstr((char *)data, req_data); - - if (command_ptr != NULL) + if (reason == DCB_REASON_DRAINED) { - char *file_ptr = command_ptr + req_data_len; - int data_len = GWBUF_LENGTH(queue) - req_data_len; - - if (data_len > 1) - { - auto file_and_gtid = get_avrofile_and_gtid(file_ptr); - - if (!file_and_gtid.second.empty()) - { - requested_gtid = true; - extract_gtid_request(>id, file_and_gtid.second.c_str(), file_and_gtid.second.size()); - memcpy(>id_start, >id, sizeof(gtid_start)); - } - - avro_binfile = file_and_gtid.first; - - if (file_in_dir(router->avrodir.c_str(), avro_binfile.c_str())) - { - /* set callback routine for data sending */ - dcb_add_callback(dcb, DCB_REASON_DRAINED, avro_client_callback, this); - - /* Add fake event that will call the avro_client_callback() routine */ - poll_fake_write_event(dcb); - } - else - { - dcb_printf(dcb, "ERR NO-FILE File '%s' not found.", avro_binfile.c_str()); - } - } - else - { - dcb_printf(dcb, "ERR REQUEST-DATA with no data"); - } - } - /* Return last GTID info */ - else if (strstr((char *)data, req_last_gtid)) - { - send_gtid_info(&router->gtid); - } - /** Return requested GTID */ - else if (strstr((char *)data, req_gtid)) - { - gtid_pos_t gtid; - extract_gtid_request(>id, (char*)data + sizeof(req_gtid), - GWBUF_LENGTH(queue) - sizeof(req_gtid)); - send_gtid_info(>id); - } - else - { - GWBUF *reply = gwbuf_alloc(5); - memcpy(GWBUF_DATA(reply), "ECHO:", 5); - reply = gwbuf_append(reply, gwbuf_clone(queue)); - dcb->func.write(dcb, reply); + AvroSession *client = static_cast(userdata); + client->client_callback(); } + + return 0; } /** @@ -534,6 +299,66 @@ std::pair get_avrofile_and_gtid(std::string file) return std::make_pair(filename, gtid); } +/** + * Process command from client + * + * @param data Buffer containing the command + * + */ +void AvroSession::process_command(GWBUF *queue) +{ + const char req_data[] = "REQUEST-DATA"; + const size_t req_data_len = sizeof(req_data) - 1; + size_t buflen = gwbuf_length(queue); + uint8_t data[buflen + 1]; + gwbuf_copy_data(queue, 0, buflen, data); + data[buflen] = '\0'; + char *command_ptr = strstr((char *)data, req_data); + + if (command_ptr != NULL) + { + char *file_ptr = command_ptr + req_data_len; + int data_len = GWBUF_LENGTH(queue) - req_data_len; + + if (data_len > 1) + { + auto file_and_gtid = get_avrofile_and_gtid(file_ptr); + + if (!file_and_gtid.second.empty()) + { + requested_gtid = true; + extract_gtid_request(>id, file_and_gtid.second.c_str(), file_and_gtid.second.size()); + memcpy(>id_start, >id, sizeof(gtid_start)); + } + + avro_binfile = file_and_gtid.first; + + if (file_in_dir(router->avrodir.c_str(), avro_binfile.c_str())) + { + /* set callback routine for data sending */ + dcb_add_callback(dcb, DCB_REASON_DRAINED, avro_client_callback, this); + + /* Add fake event that will call the avro_client_callback() routine */ + poll_fake_write_event(dcb); + } + else + { + dcb_printf(dcb, "ERR NO-FILE File '%s' not found.", avro_binfile.c_str()); + } + } + else + { + dcb_printf(dcb, "ERR REQUEST-DATA with no data"); + } + } + else + { + const char err[] = "ERR: Unknown command"; + GWBUF *reply = gwbuf_alloc_and_load(sizeof(err), err); + dcb->func.write(dcb, reply); + } +} + static int send_row(DCB *dcb, json_t* row) { char *json = json_dumps(row, JSON_PRESERVE_ORDER); @@ -883,25 +708,6 @@ static std::string get_next_filename(std::string file, std::string dir) return std::string(outbuf); } -/** - * @brief The client callback for sending data - * - * @param dcb Client DCB - * @param reason Why the callback was called - * @param userdata Data provided when the callback was added - * @return Always 0 - */ -int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata) -{ - if (reason == DCB_REASON_DRAINED) - { - AvroSession *client = static_cast(userdata); - client->client_callback(); - } - - return 0; -} - void AvroSession::client_callback() { if (last_sent_pos == 0) @@ -969,10 +775,6 @@ AvroSession* AvroSession::create(Avro* inst, MXS_SESSION* session) MXS_OOM(); sqlite3_close_v2(handle); } - else - { - atomic_add(&inst->stats.n_clients, 1); - } return client; } @@ -993,9 +795,6 @@ AvroSession::AvroSession(Avro* instance, MXS_SESSION* session, sqlite3* handle): AvroSession::~AvroSession() { - ss_debug(int prev_val = )atomic_add(&router->stats.n_clients, -1); - ss_dassert(prev_val > 0); - maxavro_file_close(file_handle); sqlite3_close_v2(sqlite_handle); } diff --git a/server/modules/routing/avrorouter/avro_file.cc b/server/modules/routing/avrorouter/avro_file.cc index 2a9913e00..e9e3a32ce 100644 --- a/server/modules/routing/avrorouter/avro_file.cc +++ b/server/modules/routing/avrorouter/avro_file.cc @@ -40,11 +40,6 @@ static const char *statefile_section = "avro-conversion"; static const char *ddl_list_name = "table-ddl.list"; void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); -bool is_create_table_statement(Avro *router, char* ptr, size_t len); -void avro_notify_client(AvroSession *client); -void update_used_tables(Avro* router); -TableCreateEvent* table_create_from_schema(const char* file, const char* db, - const char* table, int version); /** * Open a binlog file for reading @@ -385,7 +380,6 @@ void notify_all_clients(Avro *router) void do_checkpoint(Avro *router) { - update_used_tables(router); router->event_hander->flush_tables(); avro_save_conversion_state(router); notify_all_clients(router); diff --git a/server/modules/routing/avrorouter/avro_index.cc b/server/modules/routing/avrorouter/avro_index.cc index 81a2f2cd3..816c00b11 100644 --- a/server/modules/routing/avrorouter/avro_index.cc +++ b/server/modules/routing/avrorouter/avro_index.cc @@ -29,8 +29,6 @@ #include #include -void* safe_key_free(void *data); - static const char insert_template[] = "INSERT INTO gtid(domain, server_id, " "sequence, avrofile, position) values (%lu, %lu, %lu, \"%s\", %ld);"; @@ -194,59 +192,3 @@ void avro_update_index(Avro* router) globfree(&files); } - -/** The SQL for the in-memory used_tables table */ -static const char *insert_sql = "INSERT OR IGNORE INTO " MEMORY_TABLE_NAME - "(domain, server_id, sequence, binlog_timestamp, table_name)" - " VALUES (%lu, %lu, %lu, %u, \"%s\")"; - -/** - * @brief Add a used table to the current transaction - * - * This adds a table to the in-memory table used to store tables used by - * transactions. These are later flushed to disk with the Avro records. - * - * @param router Avro router instance - * @param table Table to add - */ -void add_used_table(Avro* router, const char* table) -{ - char sql[AVRO_SQL_BUFFER_SIZE], *errmsg; - snprintf(sql, sizeof(sql), insert_sql, router->gtid.domain, router->gtid.server_id, - router->gtid.seq, router->gtid.timestamp, table); - - if (sqlite3_exec(router->sqlite_handle, sql, NULL, NULL, &errmsg) != SQLITE_OK) - { - MXS_ERROR("Failed to add used table %s for GTID %lu-%lu-%lu: %s", - table, router->gtid.domain, router->gtid.server_id, - router->gtid.seq, errmsg); - } - sqlite3_free(errmsg); -} - -/** - * @brief Update the tables used in a transaction - * - * This flushes the in-memory table to disk and should be called after the - * Avro records have been flushed to disk. - * - * @param router Avro router instance - */ -void update_used_tables(Avro* router) -{ - char *errmsg; - - if (sqlite3_exec(router->sqlite_handle, "INSERT INTO " USED_TABLES_TABLE_NAME - " SELECT * FROM " MEMORY_TABLE_NAME, NULL, NULL, &errmsg) != SQLITE_OK) - { - MXS_ERROR("Failed to transfer used table data from memory to disk: %s", errmsg); - } - sqlite3_free(errmsg); - - if (sqlite3_exec(router->sqlite_handle, "DELETE FROM " MEMORY_TABLE_NAME, - NULL, NULL, &errmsg) != SQLITE_OK) - { - MXS_ERROR("Failed to transfer used table data from memory to disk: %s", errmsg); - } - sqlite3_free(errmsg); -} diff --git a/server/modules/routing/avrorouter/avro_main.cc b/server/modules/routing/avrorouter/avro_main.cc index 356652f8d..365eda990 100644 --- a/server/modules/routing/avrorouter/avro_main.cc +++ b/server/modules/routing/avrorouter/avro_main.cc @@ -163,13 +163,6 @@ diagnostics(MXS_ROUTER *router, DCB *dcb) router_inst->gtid.timestamp); dcb_printf(dcb, "\tCurrent GTID #events: %lu\n", router_inst->gtid.event_num); - - dcb_printf(dcb, "\tCurrent GTID affected tables: "); - dcb_printf(dcb, "\n"); - - dcb_printf(dcb, "\tNumber of AVRO clients: %u\n", - router_inst->stats.n_clients); - } /** @@ -197,7 +190,6 @@ static json_t* diagnostics_json(const MXS_ROUTER *router) json_object_set_new(rval, "gtid", json_string(pathbuf)); json_object_set_new(rval, "gtid_timestamp", json_integer(router_inst->gtid.timestamp)); json_object_set_new(rval, "gtid_event_number", json_integer(router_inst->gtid.event_num)); - json_object_set_new(rval, "clients", json_integer(router_inst->stats.n_clients)); return rval; } diff --git a/server/modules/routing/avrorouter/avro_rbr.cc b/server/modules/routing/avrorouter/avro_rbr.cc index a5392a285..9c8f6a96c 100644 --- a/server/modules/routing/avrorouter/avro_rbr.cc +++ b/server/modules/routing/avrorouter/avro_rbr.cc @@ -30,10 +30,6 @@ static bool warn_bit = false; /**< Remove when support for BIT is added */ static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET values * larger than 255 is added */ -void notify_all_clients(Avro *router); -void add_used_table(Avro* router, const char* table); - - uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create, RowEventHandler* conv, uint8_t *ptr, uint8_t *columns_present, uint8_t *end); @@ -262,7 +258,6 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) rows++; } - add_used_table(router, table_ident); rval = true; } else if (!ok) diff --git a/server/modules/routing/avrorouter/avrorouter.hh b/server/modules/routing/avrorouter/avrorouter.hh index a77106f72..72accb67d 100644 --- a/server/modules/routing/avrorouter/avrorouter.hh +++ b/server/modules/routing/avrorouter/avrorouter.hh @@ -52,8 +52,6 @@ MXS_BEGIN_DECLS #define GTID_TABLE_NAME "gtid" #define USED_TABLES_TABLE_NAME "used_tables" -#define MEMORY_DATABASE_NAME "memory" -#define MEMORY_TABLE_NAME MEMORY_DATABASE_NAME".mem_used_tables" #define INDEX_TABLE_NAME "indexing_progress" /** Name of the file where the binlog to Avro conversion progress is stored */ @@ -246,37 +244,29 @@ private: void rotate_avro_file(std::string fullname); }; -extern void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *table_id, - char* dest, size_t len); -extern TableMapEvent *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TableCreateEvent* create); -extern TableCreateEvent* table_create_alloc(char* ident, const char* sql, int len); -extern TableCreateEvent* table_create_copy(Avro *router, const char* sql, size_t len, const char* db); -extern bool table_create_save(TableCreateEvent *create, const char *filename); -extern bool table_create_alter(TableCreateEvent *create, const char *sql, const char *end); -extern void read_table_identifier(const char* db, const char *sql, const char *end, char *dest, int size); -extern int avro_client_handle_request(Avro *, AvroSession *, GWBUF *); -extern void avro_client_rotate(Avro *router, AvroSession *client, uint8_t *ptr); -extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd); -extern void avro_close_binlog(int fd); -extern avro_binlog_end_t avro_read_all_events(Avro *router); -extern char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create); -extern bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); -extern bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); +void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *table_id, char* dest, size_t len); +TableMapEvent *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TableCreateEvent* create); +TableCreateEvent* table_create_alloc(char* ident, const char* sql, int len); +TableCreateEvent* table_create_copy(Avro *router, const char* sql, size_t len, const char* db); +bool table_create_save(TableCreateEvent *create, const char *filename); +bool table_create_alter(TableCreateEvent *create, const char *sql, const char *end); +TableCreateEvent* table_create_from_schema(const char* file, const char* db, const char* table, + int version); +void read_table_identifier(const char* db, const char *sql, const char *end, char *dest, int size); +int avro_client_handle_request(Avro *, AvroSession *, GWBUF *); +void avro_client_rotate(Avro *router, AvroSession *client, uint8_t *ptr); +bool avro_open_binlog(const char *binlogdir, const char *file, int *fd); +void avro_close_binlog(int fd); +avro_binlog_end_t avro_read_all_events(Avro *router); +char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create); +bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); +bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos); REP_HEADER construct_header(uint8_t* ptr); bool avro_save_conversion_state(Avro *router); void avro_update_index(Avro* router); - -#define AVRO_CLIENT_UNREGISTERED 0x0000 -#define AVRO_CLIENT_REGISTERED 0x0001 -#define AVRO_CLIENT_REQUEST_DATA 0x0002 -#define AVRO_CLIENT_ERRORED 0x0003 -#define AVRO_CLIENT_MAXSTATE 0x0003 - -/** - * Client catch-up status - */ -#define AVRO_CS_BUSY 0x0001 -#define AVRO_WAIT_DATA 0x0002 +bool avro_load_conversion_state(Avro *router); +void avro_load_metadata_from_schemas(Avro *router); +void notify_all_clients(Avro *router); MXS_END_DECLS