/* * Copyright (c) 2016 MariaDB Corporation Ab * * Use of this software is governed by the Business Source License included * in the LICENSE.TXT file and at www.mariadb.com/bsl11. * * Change Date: 2020-01-01 * * On the date above, in accordance with the Business Source License, use * of this software will be governed by version 2 or later of the General * Public License. */ /** * @file avro.c - Avro router, allows MaxScale to act as an intermediary for * MySQL replication binlog files and AVRO binary files * * @verbatim * Revision History * * Date Who Description * 25/02/2016 Massimiliano Pinto Initial implementation * * @endverbatim */ #include "avrorouter.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifndef BINLOG_NAMEFMT #define BINLOG_NAMEFMT "%s.%06d" #endif #define AVRO_TASK_DELAY_MAX 15 static const char* avro_task_name = "binlog_to_avro"; static const char* index_task_name = "avro_indexing"; static const char* avro_index_name = "avro.index"; /** For detection of CREATE/ALTER TABLE statements */ static const char* create_table_regex = "(?i)create[a-z0-9[:space:]_]+table"; static const char* alter_table_regex = "(?i)alter[[:space:]]+table.*column"; /* The router entry points */ static MXS_ROUTER *createInstance(SERVICE *service, char **options); static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *instance, MXS_SESSION *session); static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session); static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session); static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue); static void diagnostics(MXS_ROUTER *instance, DCB *dcb); static json_t* diagnostics_json(const MXS_ROUTER *instance); static void clientReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue, DCB *backend_dcb); static void errorReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *message, DCB *backend_dcb, mxs_error_action_t action, bool *succp); static uint64_t getCapabilities(MXS_ROUTER* instance); extern int MaxScaleUptime(); extern void avro_get_used_tables(AVRO_INSTANCE *router, DCB *dcb); void converter_func(void* data); bool binlog_next_file_exists(const char* binlogdir, const char* binlog); int blr_file_get_next_binlogname(const char *router); bool avro_load_conversion_state(AVRO_INSTANCE *router); void avro_load_metadata_from_schemas(AVRO_INSTANCE *router); int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata); static bool ensure_dir_ok(const char* path, int mode); bool avro_save_conversion_state(AVRO_INSTANCE *router); static void stats_func(void *); void avro_index_file(AVRO_INSTANCE *router, const char* path); void avro_update_index(AVRO_INSTANCE* router); static bool conversion_task_ctl(AVRO_INSTANCE *inst, bool start); static SPINLOCK instlock; static AVRO_INSTANCE *instances; bool avro_handle_convert(const MODULECMD_ARG *args, json_t** output) { bool rval = false; if (strcmp(args->argv[1].value.string, "start") == 0 && conversion_task_ctl((AVRO_INSTANCE*)args->argv[0].value.service->router_instance, true)) { MXS_NOTICE("Started conversion for service '%s'.", args->argv[0].value.service->name); rval = true; } else if (strcmp(args->argv[1].value.string, "stop") == 0 && conversion_task_ctl((AVRO_INSTANCE*)args->argv[0].value.service->router_instance, false)) { MXS_NOTICE("Stopped conversion for service '%s'.", args->argv[0].value.service->name); rval = true; } return rval; } static const MXS_ENUM_VALUE codec_values[] = { {"null", MXS_AVRO_CODEC_NULL}, {"deflate", MXS_AVRO_CODEC_DEFLATE}, // Not yet implemented // {"snappy", MXS_AVRO_CODEC_SNAPPY}, {NULL} }; /** * The module entry point routine. It is this routine that * must populate the structure that is referred to as the * "module object", this is a structure with the set of * external entry points for this module. * * @return The module object */ MXS_MODULE* MXS_CREATE_MODULE() { spinlock_init(&instlock); instances = NULL; static modulecmd_arg_type_t args[] = { { MODULECMD_ARG_SERVICE | MODULECMD_ARG_NAME_MATCHES_DOMAIN, "The avrorouter service" }, { MODULECMD_ARG_STRING, "Action, whether to 'start' or 'stop' the conversion process" } }; modulecmd_register_command(MXS_MODULE_NAME, "convert", MODULECMD_TYPE_ACTIVE, avro_handle_convert, 2, args, "Start or stop the binlog to avro conversion process"); static MXS_ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, freeSession, routeQuery, diagnostics, diagnostics_json, clientReply, errorReply, getCapabilities, NULL }; static MXS_MODULE info = { MXS_MODULE_API_ROUTER, MXS_MODULE_GA, MXS_ROUTER_VERSION, "Binlogrouter", "V1.0.0", RCAP_TYPE_NO_RSESSION | RCAP_TYPE_NO_AUTH, &MyObject, NULL, /* Process init. */ NULL, /* Process finish. */ NULL, /* Thread init. */ NULL, /* Thread finish. */ { { "binlogdir", MXS_MODULE_PARAM_PATH, NULL, MXS_MODULE_OPT_PATH_R_OK | MXS_MODULE_OPT_PATH_CREAT }, { "avrodir", MXS_MODULE_PARAM_PATH, MXS_DEFAULT_DATADIR, MXS_MODULE_OPT_PATH_R_OK | MXS_MODULE_OPT_PATH_W_OK | MXS_MODULE_OPT_PATH_CREAT }, {"source", MXS_MODULE_PARAM_SERVICE}, {"filestem", MXS_MODULE_PARAM_STRING, BINLOG_NAME_ROOT}, {"group_rows", MXS_MODULE_PARAM_COUNT, "1000"}, {"group_trx", MXS_MODULE_PARAM_COUNT, "1"}, {"start_index", MXS_MODULE_PARAM_COUNT, "1"}, {"block_size", MXS_MODULE_PARAM_SIZE, "0"}, {"codec", MXS_MODULE_PARAM_ENUM, "null", MXS_MODULE_OPT_ENUM_UNIQUE, codec_values}, {MXS_END_MODULE_PARAMS} } }; return &info; } /** * Create the required tables in the sqlite database * * @param handle SQLite handle * @return True on success, false on error */ bool create_tables(sqlite3* handle) { char* errmsg; int rc = sqlite3_exec(handle, "CREATE TABLE IF NOT EXISTS " GTID_TABLE_NAME"(domain int, server_id int, " "sequence bigint, " "avrofile varchar(255), " "position bigint, " "primary key(domain, server_id, sequence, avrofile));", NULL, NULL, &errmsg); if (rc != SQLITE_OK) { MXS_ERROR("Failed to create GTID index table '"GTID_TABLE_NAME"': %s", sqlite3_errmsg(handle)); sqlite3_free(errmsg); return false; } rc = sqlite3_exec(handle, "CREATE TABLE IF NOT EXISTS " USED_TABLES_TABLE_NAME"(domain int, server_id int, " "sequence bigint, binlog_timestamp bigint, " "table_name varchar(255));", NULL, NULL, &errmsg); if (rc != SQLITE_OK) { MXS_ERROR("Failed to create used tables table '"USED_TABLES_TABLE_NAME"': %s", sqlite3_errmsg(handle)); sqlite3_free(errmsg); return false; } rc = sqlite3_exec(handle, "CREATE TABLE IF NOT EXISTS " INDEX_TABLE_NAME"(position bigint, filename varchar(255));", NULL, NULL, &errmsg); if (rc != SQLITE_OK) { MXS_ERROR("Failed to create indexing progress table '"INDEX_TABLE_NAME"': %s", sqlite3_errmsg(handle)); sqlite3_free(errmsg); return false; } rc = sqlite3_exec(handle, "ATTACH DATABASE ':memory:' AS "MEMORY_DATABASE_NAME, NULL, NULL, &errmsg); if (rc != SQLITE_OK) { MXS_ERROR("Failed to attach in-memory database '"MEMORY_DATABASE_NAME"': %s", sqlite3_errmsg(handle)); sqlite3_free(errmsg); return false; } rc = sqlite3_exec(handle, "CREATE TABLE " MEMORY_TABLE_NAME "(domain int, server_id int, " "sequence bigint, binlog_timestamp bigint, " "table_name varchar(255), primary key (domain, server_id, sequence, table_name));", NULL, NULL, &errmsg); if (rc != SQLITE_OK) { MXS_ERROR("Failed to create in-memory used tables table '"MEMORY_DATABASE_NAME "."MEMORY_TABLE_NAME"': %s", sqlite3_errmsg(handle)); sqlite3_free(errmsg); return false; } return true; } static bool conversion_task_ctl(AVRO_INSTANCE *inst, bool start) { bool rval = false; if (!inst->service->svc_do_shutdown) { char tasknm[strlen(avro_task_name) + strlen(inst->service->name) + 2]; snprintf(tasknm, sizeof(tasknm), "%s-%s", inst->service->name, avro_task_name); /** Remove old task and create a new one */ hktask_remove(tasknm); if (!start || hktask_add(tasknm, converter_func, inst, inst->task_delay)) { rval = true; } else { MXS_ERROR("Failed to add binlog to Avro conversion task to housekeeper."); } } return rval; } /** * @brief Read router options from an external binlogrouter service * * This reads common options used by both the avrorouter and the binlogrouter * from a service that uses the binlogrouter. This way the basic configuration * details can be read from another service without the need to configure the * avrorouter with identical router options. * * @param inst Avro router instance * @param options The @c router_options of a binlogrouter instance */ void read_source_service_options(AVRO_INSTANCE *inst, const char** options) { if (options) { for (int i = 0; options[i]; i++) { char option[strlen(options[i]) + 1]; strcpy(option, options[i]); char *value = strchr(option, '='); if (value) { *value++ = '\0'; value = trim(value); if (strcmp(option, "binlogdir") == 0) { inst->binlogdir = MXS_STRDUP_A(value); MXS_INFO("Reading MySQL binlog files from %s", inst->binlogdir); } else if (strcmp(option, "filestem") == 0) { inst->fileroot = MXS_STRDUP_A(value); } } } } } /** * TABLE_CREATE free function for use with hashtable. * @param v Pointer to a TABLE_CREATE */ static void table_create_hfree(void* v) { table_create_free((TABLE_CREATE*)v); } /** * AVRO_TABLE free function for use with hashtable. * @param v Pointer to a AVRO_TABLE */ static void avro_table_hfree(void* v) { avro_table_free((AVRO_TABLE*)v); } /** * TABLE_MAP free function for use with hashtable. * @param v Pointer to a TABLE_MAP */ static void table_map_hfree(void* v) { table_map_free((TABLE_MAP*)v); } /** * Create an instance of the router for a particular service * within MaxScale. * * The process of creating the instance causes the router to register * with the master server and begin replication of the binlogs from * the master server to MaxScale. * * @param service The service this router is being create for * @param options An array of options for this query router * * @return The instance data for this new instance */ static MXS_ROUTER * createInstance(SERVICE *service, char **options) { AVRO_INSTANCE *inst; int i; if ((inst = MXS_CALLOC(1, sizeof(AVRO_INSTANCE))) == NULL) { return NULL; } memset(&inst->stats, 0, sizeof(AVRO_ROUTER_STATS)); spinlock_init(&inst->lock); spinlock_init(&inst->fileslock); inst->service = service; inst->binlog_fd = -1; inst->current_pos = 4; inst->binlog_position = 4; inst->clients = NULL; inst->next = NULL; inst->lastEventTimestamp = 0; inst->binlog_position = 0; inst->task_delay = 1; inst->row_count = 0; inst->trx_count = 0; inst->binlogdir = NULL; MXS_CONFIG_PARAMETER *params = service->svc_config_param; inst->avrodir = MXS_STRDUP_A(config_get_string(params, "avrodir")); inst->fileroot = MXS_STRDUP_A(config_get_string(params, "filestem")); inst->row_target = config_get_integer(params, "group_rows"); inst->trx_target = config_get_integer(params, "group_trx"); inst->codec = config_get_enum(params, "codec", codec_values); int first_file = config_get_integer(params, "start_index"); inst->block_size = config_get_size(params, "block_size"); MXS_CONFIG_PARAMETER *param = config_get_param(params, "source"); inst->gtid.domain = 0; inst->gtid.event_num = 0; inst->gtid.seq = 0; inst->gtid.server_id = 0; inst->gtid.timestamp = 0; memset(&inst->active_maps, 0, sizeof(inst->active_maps)); bool err = false; if (param) { SERVICE *source = service_find(param->value); ss_dassert(source); if (source) { if (strcmp(source->routerModule, "binlogrouter") == 0) { MXS_NOTICE("[%s] Using configuration options from service '%s'.", service->name, source->name); read_source_service_options(inst, (const char**)source->routerOptions); } else { MXS_ERROR("[%s] Service '%s' uses router module '%s' instead of" " 'binlogrouter'.", service->name, source->name, source->routerModule); err = true; } } } param = config_get_param(params, "binlogdir"); if (param) { inst->binlogdir = MXS_STRDUP_A(param->value); } if (options) { MXS_WARNING("Router options for Avrorouter are deprecated. Please convert them to parameters."); for (i = 0; options[i]; i++) { char *value; if ((value = strchr(options[i], '='))) { *value++ = '\0'; trim(value); trim(options[i]); if (strcmp(options[i], "binlogdir") == 0) { MXS_FREE(inst->binlogdir); inst->binlogdir = MXS_STRDUP_A(value); } else if (strcmp(options[i], "avrodir") == 0) { MXS_FREE(inst->avrodir); inst->avrodir = MXS_STRDUP_A(value); } else if (strcmp(options[i], "filestem") == 0) { MXS_FREE(inst->fileroot); inst->fileroot = MXS_STRDUP_A(value); } else if (strcmp(options[i], "group_rows") == 0) { inst->row_target = atoi(value); } else if (strcmp(options[i], "group_trx") == 0) { inst->trx_target = atoi(value); } else if (strcmp(options[i], "start_index") == 0) { first_file = MXS_MAX(1, atoi(value)); } else if (strcmp(options[i], "block_size") == 0) { inst->block_size = atoi(value); } else { MXS_WARNING("Unknown router option: '%s'", options[i]); err = true; } } else { MXS_WARNING("Unknown router option: '%s'", options[i]); err = true; } } } if (inst->binlogdir == NULL) { MXS_ERROR("No 'binlogdir' option found in source service, in parameters or in router_options."); err = true; } else if (ensure_dir_ok(inst->binlogdir, R_OK) && ensure_dir_ok(inst->avrodir, W_OK)) { snprintf(inst->binlog_name, sizeof(inst->binlog_name), BINLOG_NAMEFMT, inst->fileroot, first_file); inst->prevbinlog[0] = '\0'; MXS_NOTICE("[%s] Reading MySQL binlog files from %s", service->name, inst->binlogdir); MXS_NOTICE("[%s] Avro files stored at: %s", service->name, inst->avrodir); MXS_NOTICE("[%s] First binlog is: %s", service->name, inst->binlog_name); } if ((inst->table_maps = hashtable_alloc(1000, hashtable_item_strhash, hashtable_item_strcmp)) && (inst->open_tables = hashtable_alloc(1000, hashtable_item_strhash, hashtable_item_strcmp)) && (inst->created_tables = hashtable_alloc(1000, hashtable_item_strhash, hashtable_item_strcmp))) { hashtable_memory_fns(inst->table_maps, hashtable_item_strdup, NULL, hashtable_item_free, table_map_hfree); hashtable_memory_fns(inst->open_tables, hashtable_item_strdup, NULL, hashtable_item_free, avro_table_hfree); hashtable_memory_fns(inst->created_tables, hashtable_item_strdup, NULL, hashtable_item_free, table_create_hfree); } else { MXS_ERROR("Hashtable allocation failed. This is most likely caused " "by a lack of available memory."); err = true; } int pcreerr; size_t erroff; pcre2_code *create_re = pcre2_compile((PCRE2_SPTR) create_table_regex, PCRE2_ZERO_TERMINATED, 0, &pcreerr, &erroff, NULL); ss_dassert(create_re); // This should almost never fail pcre2_code *alter_re = pcre2_compile((PCRE2_SPTR) alter_table_regex, PCRE2_ZERO_TERMINATED, 0, &pcreerr, &erroff, NULL); ss_dassert(alter_re); // This should almost never fail if (create_re && alter_re) { inst->create_table_re = create_re; inst->alter_table_re = alter_re; } else { err = true; } char dbpath[PATH_MAX + 1]; snprintf(dbpath, sizeof(dbpath), "/%s/%s", inst->avrodir, avro_index_name); if (access(dbpath, W_OK) == 0) { MXS_NOTICE("[%s] Using existing GTID index: %s", service->name, dbpath); } if (sqlite3_open_v2(dbpath, &inst->sqlite_handle, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL) != SQLITE_OK) { MXS_ERROR("Failed to open SQLite database '%s': %s", dbpath, sqlite3_errmsg(inst->sqlite_handle)); err = true; } else if (!create_tables(inst->sqlite_handle)) { err = true; } if (err) { sqlite3_close_v2(inst->sqlite_handle); hashtable_free(inst->table_maps); hashtable_free(inst->open_tables); hashtable_free(inst->created_tables); MXS_FREE(inst->avrodir); MXS_FREE(inst->binlogdir); MXS_FREE(inst->fileroot); MXS_FREE(inst); return NULL; } /** * We have completed the creation of the instance data, so now * insert this router instance into the linked list of routers * that have been created with this module. */ spinlock_acquire(&instlock); inst->next = instances; instances = inst; spinlock_release(&instlock); /* AVRO converter init */ avro_load_conversion_state(inst); avro_load_metadata_from_schemas(inst); /* * Add tasks for statistic computation */ /** Not used currenly snprintf(task_name, BLRM_TASK_NAME_LEN, "%s stats", service->name); hktask_add(task_name, stats_func, inst, AVRO_STATS_FREQ); */ /* Start the scan, read, convert AVRO task */ conversion_task_ctl(inst, true); MXS_INFO("current MySQL binlog file is %s, pos is %lu\n", inst->binlog_name, inst->current_pos); return (MXS_ROUTER *) inst; } /** * Associate a new session with this instance of the router. * * In the case of the avrorouter a new session equates to a new slave * connecting to MaxScale and requesting binlog records. We need to go * through the slave registration process for this new slave. * * @param instance The router instance data * @param session The session itself * @return Session specific data for this session */ static MXS_ROUTER_SESSION * newSession(MXS_ROUTER *instance, MXS_SESSION *session) { AVRO_INSTANCE *inst = (AVRO_INSTANCE *) instance; AVRO_CLIENT *client; MXS_DEBUG("%lu [newSession] new router session with " "session %p, and inst %p.", pthread_self(), session, inst); if ((client = (AVRO_CLIENT *) MXS_CALLOC(1, sizeof(AVRO_CLIENT))) == NULL) { return NULL; } #if defined(SS_DEBUG) client->rses_chk_top = CHK_NUM_ROUTER_SES; client->rses_chk_tail = CHK_NUM_ROUTER_SES; #endif memset(&client->stats, 0, sizeof(AVRO_CLIENT_STATS)); atomic_add(&inst->stats.n_clients, 1); client->uuid = NULL; spinlock_init(&client->catch_lock); client->dcb = session->client_dcb; client->router = inst; client->format = AVRO_FORMAT_UNDEFINED; client->cstate = 0; client->connect_time = time(0); client->last_sent_pos = 0; memset(&client->gtid, 0, sizeof(client->gtid)); memset(&client->gtid_start, 0, sizeof(client->gtid_start)); /* Set initial state of the slave */ client->state = AVRO_CLIENT_UNREGISTERED; char dbpath[PATH_MAX + 1]; snprintf(dbpath, sizeof(dbpath), "/%s/%s", inst->avrodir, avro_index_name); /** A new handle for each client allows thread-safe use of the sqlite database */ if (sqlite3_open_v2(dbpath, &client->sqlite_handle, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL) != SQLITE_OK) { MXS_ERROR("Failed to open SQLite database '%s': %s", dbpath, sqlite3_errmsg(inst->sqlite_handle)); sqlite3_close_v2(client->sqlite_handle); } /** * Add this session to the list of active sessions. */ spinlock_acquire(&inst->lock); client->next = inst->clients; inst->clients = client; spinlock_release(&inst->lock); CHK_CLIENT_RSES(client); return (void *) client; } /** * The session is no longer required. Shutdown all operation and free memory * associated with this session. In this case a single session is associated * to a slave of MaxScale. Therefore this is called when that slave is no * longer active and should remove of reference to that slave, free memory * and prevent any further forwarding of binlog records to that slave. * * Parameters: * @param router_instance The instance of the router * @param router_cli_ses The particular session to free * */ static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_client_ses) { AVRO_INSTANCE *router = (AVRO_INSTANCE *) router_instance; AVRO_CLIENT *client = (AVRO_CLIENT *) router_client_ses; ss_debug(int prev_val = )atomic_add(&router->stats.n_clients, -1); ss_dassert(prev_val > 0); free(client->uuid); maxavro_file_close(client->file_handle); sqlite3_close_v2(client->sqlite_handle); /* * Remove the slave session form the list of slaves that are using the * router currently. */ spinlock_acquire(&router->lock); if (router->clients == client) { router->clients = client->next; } else { AVRO_CLIENT *ptr = router->clients; while (ptr != NULL && ptr->next != client) { ptr = ptr->next; } if (ptr != NULL) { ptr->next = client->next; } } spinlock_release(&router->lock); MXS_FREE(client); } /** * Close a session with the router, this is the mechanism * by which a router may cleanup data structure etc. * * @param instance The router instance data * @param router_session The session being closed */ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session) { AVRO_INSTANCE *router = (AVRO_INSTANCE *) instance; AVRO_CLIENT *client = (AVRO_CLIENT *) router_session; CHK_CLIENT_RSES(client); spinlock_acquire(&client->catch_lock); spinlock_acquire(&client->file_lock); client->state = AVRO_CLIENT_UNREGISTERED; spinlock_release(&client->file_lock); spinlock_release(&client->catch_lock); } /** * We have data from the client, this is likely to be packets related to * the registration of the slave to receive binlog records. Unlike most * MaxScale routers there is no forwarding to the backend database, merely * the return of either predefined server responses that have been cached * or binlog records. * * @param instance The router instance * @param router_session The router session returned from the newSession call * @param queue The queue of data buffers to route * @return 1 on success, 0 on error */ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue) { AVRO_INSTANCE *router = (AVRO_INSTANCE *) instance; AVRO_CLIENT *client = (AVRO_CLIENT *) router_session; return avro_client_handle_request(router, client, queue); } /** * Display router diagnostics * * @param instance Instance of the router * @param dcb DCB to send diagnostics to */ static void diagnostics(MXS_ROUTER *router, DCB *dcb) { AVRO_INSTANCE *router_inst = (AVRO_INSTANCE *) router; AVRO_CLIENT *session; int i = 0; char buf[40]; struct tm tm; spinlock_acquire(&router_inst->lock); session = router_inst->clients; while (session) { i++; session = session->next; } spinlock_release(&router_inst->lock); dcb_printf(dcb, "\tAVRO Converter infofile: %s/%s\n", router_inst->avrodir, AVRO_PROGRESS_FILE); dcb_printf(dcb, "\tAVRO files directory: %s\n", router_inst->avrodir); localtime_r(&router_inst->stats.lastReply, &tm); asctime_r(&tm, buf); dcb_printf(dcb, "\tBinlog directory: %s\n", router_inst->binlogdir); dcb_printf(dcb, "\tCurrent binlog file: %s\n", router_inst->binlog_name); dcb_printf(dcb, "\tCurrent binlog position: %lu\n", router_inst->current_pos); dcb_printf(dcb, "\tCurrent GTID value: %lu-%lu-%lu\n", router_inst->gtid.domain, router_inst->gtid.server_id, router_inst->gtid.seq); dcb_printf(dcb, "\tCurrent GTID timestamp: %u\n", router_inst->gtid.timestamp); dcb_printf(dcb, "\tCurrent GTID #events: %lu\n", router_inst->gtid.event_num); dcb_printf(dcb, "\tCurrent GTID affected tables: "); avro_get_used_tables(router_inst, dcb); dcb_printf(dcb, "\n"); dcb_printf(dcb, "\tNumber of AVRO clients: %u\n", router_inst->stats.n_clients); if (router_inst->clients) { dcb_printf(dcb, "\tClients:\n"); spinlock_acquire(&router_inst->lock); session = router_inst->clients; while (session) { char sync_marker_hex[SYNC_MARKER_SIZE * 2 + 1]; dcb_printf(dcb, "\t\tClient UUID: %s\n", session->uuid); dcb_printf(dcb, "\t\tClient_host_port: [%s]:%d\n", session->dcb->remote, dcb_get_port(session->dcb)); dcb_printf(dcb, "\t\tUsername: %s\n", session->dcb->user); dcb_printf(dcb, "\t\tClient DCB: %p\n", session->dcb); dcb_printf(dcb, "\t\tClient protocol: %s\n", session->dcb->service->ports->protocol); dcb_printf(dcb, "\t\tClient Output Format: %s\n", avro_client_ouput[session->format]); dcb_printf(dcb, "\t\tState: %s\n", avro_client_states[session->state]); dcb_printf(dcb, "\t\tAvro file: %s\n", session->avro_binfile); gw_bin2hex(sync_marker_hex, session->avro_file.sync, SYNC_MARKER_SIZE); dcb_printf(dcb, "\t\tAvro file SyncMarker: %s\n", sync_marker_hex); dcb_printf(dcb, "\t\tAvro file last read block: %lu\n", session->avro_file.blocks_read); dcb_printf(dcb, "\t\tAvro file last read record: %lu\n", session->avro_file.records_read); if (session->gtid_start.domain > 0 || session->gtid_start.server_id > 0 || session->gtid_start.seq > 0) { dcb_printf(dcb, "\t\tRequested GTID: %lu-%lu-%lu\n", session->gtid_start.domain, session->gtid_start.server_id, session->gtid_start.seq); } dcb_printf(dcb, "\t\tCurrent GTID: %lu-%lu-%lu\n", session->gtid.domain, session->gtid.server_id, session->gtid.seq); dcb_printf(dcb, "\t\t--------------------\n\n"); session = session->next; } spinlock_release(&router_inst->lock); } } /** * Display router diagnostics * * @param instance Instance of the router */ static json_t* diagnostics_json(const MXS_ROUTER *router) { AVRO_INSTANCE *router_inst = (AVRO_INSTANCE *)router; json_t* rval = json_object(); char pathbuf[PATH_MAX + 1]; snprintf(pathbuf, sizeof(pathbuf), "%s/%s", router_inst->avrodir, AVRO_PROGRESS_FILE); json_object_set_new(rval, "infofile", json_string(pathbuf)); json_object_set_new(rval, "avrodir", json_string(router_inst->avrodir)); json_object_set_new(rval, "binlogdir", json_string(router_inst->binlogdir)); json_object_set_new(rval, "binlog_name", json_string(router_inst->binlog_name)); json_object_set_new(rval, "binlog_pos", json_integer(router_inst->current_pos)); snprintf(pathbuf, sizeof(pathbuf), "%lu-%lu-%lu", router_inst->gtid.domain, router_inst->gtid.server_id, router_inst->gtid.seq); json_object_set_new(rval, "gtid", json_string(pathbuf)); json_object_set_new(rval, "gtid_timestamp", json_integer(router_inst->gtid.timestamp)); json_object_set_new(rval, "gtid_event_number", json_integer(router_inst->gtid.event_num)); json_object_set_new(rval, "clients", json_integer(router_inst->stats.n_clients)); if (router_inst->clients) { json_t* arr = json_array(); spinlock_acquire(&router_inst->lock); for (AVRO_CLIENT *session = router_inst->clients; session; session = session->next) { json_t* client = json_object(); json_object_set_new(client, "uuid", json_string(session->uuid)); json_object_set_new(client, "host", json_string(session->dcb->remote)); json_object_set_new(client, "port", json_integer(dcb_get_port(session->dcb))); json_object_set_new(client, "user", json_string(session->dcb->user)); json_object_set_new(client, "format", json_string(avro_client_ouput[session->format])); json_object_set_new(client, "state", json_string(avro_client_states[session->state])); json_object_set_new(client, "avrofile", json_string(session->avro_binfile)); json_object_set_new(client, "avrofile_last_block", json_integer(session->avro_file.blocks_read)); json_object_set_new(client, "avrofile_last_record", json_integer(session->avro_file.records_read)); if (session->gtid_start.domain > 0 || session->gtid_start.server_id > 0 || session->gtid_start.seq > 0) { snprintf(pathbuf, sizeof(pathbuf), "%lu-%lu-%lu", session->gtid_start.domain, session->gtid_start.server_id, session->gtid_start.seq); json_object_set_new(client, "requested_gtid", json_string(pathbuf)); } snprintf(pathbuf, sizeof(pathbuf), "%lu-%lu-%lu", session->gtid.domain, session->gtid.server_id, session->gtid.seq); json_object_set_new(client, "current_gtid", json_string(pathbuf)); json_array_append_new(arr, client); } spinlock_release(&router_inst->lock); json_object_set_new(rval, "clients", arr); } return rval; } /** * Client Reply routine - in this case this is a message from the * master server, It should be sent to the state machine that manages * master packets as it may be binlog records or part of the registration * handshake that takes part during connection establishment. * * * @param instance The router instance * @param router_session The router session * @param master_dcb The DCB for the connection to the master * @param queue The GWBUF with reply data */ static void clientReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue, DCB *backend_dcb) { /** We should never end up here */ ss_dassert(false); } /* static char * extract_message(GWBUF *errpkt) { char *rval; int len; len = EXTRACT24(errpkt->start); if ((rval = (char *) MXS_MALLOC(len)) == NULL) { return NULL; } memcpy(rval, (char *) (errpkt->start) + 7, 6); rval[6] = ' '; // message size is len - (1 byte field count + 2 bytes errno + 6 bytes status) memcpy(&rval[7], (char *) (errpkt->start) + 13, len - 9); rval[len - 2] = 0; return rval; } */ /** * Error Reply routine * * The routine will reply to client errors and/or closing the session * or try to open a new backend connection. * * @param instance The router instance * @param router_session The router session * @param message The error message to reply * @param backend_dcb The backend DCB * @param action The action: ERRACT_NEW_CONNECTION or ERRACT_REPLY_CLIENT * @param succp Result of action: true iff router can continue * */ static void errorReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *message, DCB *backend_dcb, mxs_error_action_t action, bool *succp) { /** We should never end up here */ ss_dassert(false); } static uint64_t getCapabilities(MXS_ROUTER* instance) { return RCAP_TYPE_NONE; } /** * The stats gathering function called from the housekeeper so that we * can get timed averages of binlog records shippped * * @param inst The router instance */ /* static void stats_func(void *inst) { AVRO_INSTANCE *router = (AVRO_INSTANCE *) inst; AVRO_CLIENT *client; router->stats.minavgs[router->stats.minno++] = router->stats.n_binlogs - router->stats.lastsample; router->stats.lastsample = router->stats.n_binlogs; if (router->stats.minno == AVRO_NSTATS_MINUTES) { router->stats.minno = 0; } spinlock_acquire(&router->lock); client = router->clients; while (client) { client->stats.minavgs[client->stats.minno++] = client->stats.n_events - client->stats.lastsample; client->stats.lastsample = client->stats.n_events; if (client->stats.minno == AVRO_NSTATS_MINUTES) { client->stats.minno = 0; } client = client->next; } spinlock_release(&router->lock); } */ /** * Conversion task: MySQL binlogs to AVRO files */ void converter_func(void* data) { AVRO_INSTANCE* router = (AVRO_INSTANCE*) data; bool ok = true; avro_binlog_end_t binlog_end = AVRO_OK; while (!router->service->svc_do_shutdown && ok && binlog_end == AVRO_OK) { uint64_t start_pos = router->current_pos; char binlog_name[BINLOG_FNAMELEN + 1]; strcpy(binlog_name, router->binlog_name); if (avro_open_binlog(router->binlogdir, router->binlog_name, &router->binlog_fd)) { binlog_end = avro_read_all_events(router); if (router->current_pos != start_pos || strcmp(binlog_name, router->binlog_name) != 0) { /** We processed some data, reset the conversion task delay */ router->task_delay = 1; /** Update the GTID index */ avro_update_index(router); } avro_close_binlog(router->binlog_fd); } else { binlog_end = AVRO_BINLOG_ERROR; } } /** We reached end of file, flush unwritten records to disk */ if (router->task_delay == 1) { avro_flush_all_tables(router, AVROROUTER_FLUSH); avro_save_conversion_state(router); } if (binlog_end == AVRO_LAST_FILE) { router->task_delay = MXS_MIN(router->task_delay + 1, AVRO_TASK_DELAY_MAX); if (conversion_task_ctl(router, true)) { MXS_INFO("Stopped processing file %s at position %lu. Waiting until" " more data is written before continuing. Next check in %d seconds.", router->binlog_name, router->current_pos, router->task_delay); } } } /** * @brief Ensure directory exists and is writable * * TODO: Move this as a function in the core * * @param path Path to directory * @param mode One of O_RDONLY, O_WRONLY or O_RDWR * @return True if directory exists and can be opened with @p mode permission */ static bool ensure_dir_ok(const char* path, int mode) { bool rval = false; if (path) { char resolved[PATH_MAX + 1]; const char *rp = realpath(path, resolved); if (rp == NULL && errno == ENOENT) { rp = path; } if (rp) { /** Make sure the directory exists */ if (mkdir(rp, 0774) == 0 || errno == EEXIST) { if (access(rp, mode) == 0) { rval = true; } else { MXS_ERROR("Failed to access directory '%s': %d, %s", rp, errno, mxs_strerror(errno)); } } else { MXS_ERROR("Failed to create directory '%s': %d, %s", rp, errno, mxs_strerror(errno)); } } else { MXS_ERROR("Failed to resolve real path name for '%s': %d, %s", path, errno, mxs_strerror(errno)); } } return rval; }