711 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			711 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|  * Copyright (c) 2016 MariaDB Corporation Ab
 | |
|  *
 | |
|  * Use of this software is governed by the Business Source License included
 | |
|  * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | |
|  *
 | |
|  * Change Date: 2023-11-12
 | |
|  *
 | |
|  * On the date above, in accordance with the Business Source License, use
 | |
|  * of this software will be governed by version 2 or later of the General
 | |
|  * Public License.
 | |
|  */
 | |
| 
 | |
| /**
 | |
|  * @file avro_file.c - File operations for the Avro router
 | |
|  *
 | |
|  * This file contains functions that handle the low level file operations for
 | |
|  * the Avro router. The handling of Avro data files is done via the Avro C API
 | |
|  * but the handling of MySQL format binary logs is done manually.
 | |
|  *
 | |
|  * Parts of this file have been copied from blr_file.c and modified for other
 | |
|  * uses.
 | |
|  */
 | |
| 
 | |
| #include "avrorouter.hh"
 | |
| #include <maxscale/query_classifier.hh>
 | |
| 
 | |
| #include <binlog_common.hh>
 | |
| #include <blr_constants.hh>
 | |
| #include <glob.h>
 | |
| #include <ini.h>
 | |
| #include <stdlib.h>
 | |
| #include <sys/stat.h>
 | |
| 
 | |
| #include <maxbase/alloc.h>
 | |
| #include <maxscale/maxscale.h>
 | |
| #include <maxscale/pcre2.h>
 | |
| #include <maxscale/utils.h>
 | |
| #include <maxscale/routingworker.hh>
 | |
| 
 | |
| static const char* statefile_section = "avro-conversion";
 | |
| 
 | |
| 
 | |
| /**
 | |
|  * Open a binlog file for reading
 | |
|  *
 | |
|  * @param router    The router instance
 | |
|  * @param file      The binlog file name
 | |
|  */
 | |
| bool avro_open_binlog(const char* binlogdir, const char* file, int* dest)
 | |
| {
 | |
|     char path[PATH_MAX + 1] = "";
 | |
|     int fd;
 | |
| 
 | |
|     snprintf(path, sizeof(path), "%s/%s", binlogdir, file);
 | |
| 
 | |
|     if ((fd = open(path, O_RDONLY)) == -1)
 | |
|     {
 | |
|         if (errno != ENOENT)
 | |
|         {
 | |
|             MXS_ERROR("Failed to open binlog file %s: %d, %s",
 | |
|                       path,
 | |
|                       errno,
 | |
|                       mxs_strerror(errno));
 | |
|         }
 | |
|         return false;
 | |
|     }
 | |
| 
 | |
|     if (lseek(fd, BINLOG_MAGIC_SIZE, SEEK_SET) < 4)
 | |
|     {
 | |
|         /* If for any reason the file's length is between 1 and 3 bytes
 | |
|          * then report an error. */
 | |
|         MXS_ERROR("Binlog file %s has an invalid length.", path);
 | |
|         close(fd);
 | |
|         return false;
 | |
|     }
 | |
| 
 | |
|     *dest = fd;
 | |
|     return true;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Close a binlog file
 | |
|  * @param fd Binlog file descriptor
 | |
|  */
 | |
| void avro_close_binlog(int fd)
 | |
| {
 | |
|     close(fd);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Write a new ini file with current conversion status
 | |
|  *
 | |
|  * The file is stored in the cache directory as 'avro-conversion.ini'.
 | |
|  * @param router Avro router instance
 | |
|  * @return True if the file was written successfully to disk
 | |
|  *
 | |
|  */
 | |
| bool avro_save_conversion_state(Avro* router)
 | |
| {
 | |
|     FILE* config_file;
 | |
|     char filename[PATH_MAX + 1];
 | |
| 
 | |
|     snprintf(filename, sizeof(filename), "%s/" AVRO_PROGRESS_FILE ".tmp", router->avrodir.c_str());
 | |
| 
 | |
|     /* open file for writing */
 | |
|     config_file = fopen(filename, "wb");
 | |
| 
 | |
|     if (config_file == NULL)
 | |
|     {
 | |
|         MXS_ERROR("Failed to open file '%s': %d, %s",
 | |
|                   filename,
 | |
|                   errno,
 | |
|                   mxs_strerror(errno));
 | |
|         return false;
 | |
|     }
 | |
| 
 | |
|     gtid_pos_t gtid = router->handler.get_gtid();
 | |
|     fprintf(config_file, "[%s]\n", statefile_section);
 | |
|     fprintf(config_file, "position=%lu\n", router->current_pos);
 | |
|     fprintf(config_file,
 | |
|             "gtid=%lu-%lu-%lu:%lu\n",
 | |
|             gtid.domain,
 | |
|             gtid.server_id,
 | |
|             gtid.seq,
 | |
|             gtid.event_num);
 | |
|     fprintf(config_file, "file=%s\n", router->binlog_name.c_str());
 | |
|     fclose(config_file);
 | |
| 
 | |
|     /* rename tmp file to right filename */
 | |
|     char newname[PATH_MAX + 1];
 | |
|     snprintf(newname, sizeof(newname), "%s/" AVRO_PROGRESS_FILE, router->avrodir.c_str());
 | |
|     int rc = rename(filename, newname);
 | |
| 
 | |
|     if (rc == -1)
 | |
|     {
 | |
|         MXS_ERROR("Failed to rename file '%s' to '%s': %d, %s",
 | |
|                   filename,
 | |
|                   newname,
 | |
|                   errno,
 | |
|                   mxs_strerror(errno));
 | |
|         return false;
 | |
|     }
 | |
| 
 | |
|     return true;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Callback for the @c ini_parse of the stored conversion position
 | |
|  *
 | |
|  * @param data User provided data
 | |
|  * @param section Section name
 | |
|  * @param key Parameter name
 | |
|  * @param value Parameter value
 | |
|  * @return 1 if the parsing should continue, 0 if an error was detected
 | |
|  */
 | |
| static int conv_state_handler(void* data, const char* section, const char* key, const char* value)
 | |
| {
 | |
|     Avro* router = (Avro*) data;
 | |
| 
 | |
|     if (strcmp(section, statefile_section) == 0)
 | |
|     {
 | |
|         if (strcmp(key, "gtid") == 0)
 | |
|         {
 | |
|             gtid_pos_t gtid;
 | |
|             MXB_AT_DEBUG(bool rval = ) gtid.parse(value);
 | |
|             mxb_assert(rval);
 | |
|             router->handler.set_gtid(gtid);
 | |
|         }
 | |
|         else if (strcmp(key, "position") == 0)
 | |
|         {
 | |
|             router->current_pos = strtol(value, NULL, 10);
 | |
|         }
 | |
|         else if (strcmp(key, "file") == 0)
 | |
|         {
 | |
|             size_t len = strlen(value);
 | |
| 
 | |
|             if (len > BINLOG_FNAMELEN)
 | |
|             {
 | |
|                 MXS_ERROR("Provided value %s for key 'file' is too long. "
 | |
|                           "The maximum allowed length is %d.",
 | |
|                           value,
 | |
|                           BINLOG_FNAMELEN);
 | |
|                 return 0;
 | |
|             }
 | |
| 
 | |
|             router->binlog_name = value;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             return 0;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     return 1;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Load a stored conversion state from file
 | |
|  *
 | |
|  * @param router Avro router instance
 | |
|  * @return True if the stored state was loaded successfully
 | |
|  */
 | |
| bool avro_load_conversion_state(Avro* router)
 | |
| {
 | |
|     char filename[PATH_MAX + 1];
 | |
|     bool rval = false;
 | |
| 
 | |
|     snprintf(filename, sizeof(filename), "%s/" AVRO_PROGRESS_FILE, router->avrodir.c_str());
 | |
| 
 | |
|     /** No stored state, this is the first time the router is started */
 | |
|     if (access(filename, F_OK) == -1)
 | |
|     {
 | |
|         return true;
 | |
|     }
 | |
| 
 | |
|     MXS_NOTICE("[%s] Loading stored conversion state: %s", router->service->name(), filename);
 | |
| 
 | |
|     int rc = ini_parse(filename, conv_state_handler, router);
 | |
| 
 | |
|     switch (rc)
 | |
|     {
 | |
|     case 0:
 | |
|         {
 | |
|             rval = true;
 | |
|             gtid_pos_t gtid = router->handler.get_gtid();
 | |
|             MXS_NOTICE(
 | |
|                 "Loaded stored binary log conversion state: File: [%s] Position: [%ld] GTID: [%lu-%lu-%lu:%lu]",
 | |
|                 router->binlog_name.c_str(),
 | |
|                 router->current_pos,
 | |
|                 gtid.domain,
 | |
|                 gtid.server_id,
 | |
|                 gtid.seq,
 | |
|                 gtid.event_num);
 | |
|         }
 | |
|         break;
 | |
| 
 | |
|     case -1:
 | |
|         MXS_ERROR("Failed to open file '%s'. ", filename);
 | |
|         break;
 | |
| 
 | |
|     case -2:
 | |
|         MXS_ERROR("Failed to allocate enough memory when parsing file '%s'. ", filename);
 | |
|         break;
 | |
| 
 | |
|     default:
 | |
|         MXS_ERROR("Failed to parse stored conversion state '%s', error "
 | |
|                   "on line %d. ",
 | |
|                   filename,
 | |
|                   rc);
 | |
|         break;
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Rotate to next file if it exists
 | |
|  *
 | |
|  * @param router Avro router instance
 | |
|  * @param pos Current position, used for logging
 | |
|  * @param stop_seen If a stop event was seen when processing current file
 | |
|  * @return AVRO_OK if the next file exists, AVRO_LAST_FILE if this is the last
 | |
|  * available file.
 | |
|  */
 | |
| static avro_binlog_end_t rotate_to_next_file_if_exists(Avro* router, uint64_t pos)
 | |
| {
 | |
|     avro_binlog_end_t rval = AVRO_LAST_FILE;
 | |
| 
 | |
|     if (binlog_next_file_exists(router->binlogdir.c_str(), router->binlog_name.c_str()))
 | |
|     {
 | |
|         char next_binlog[BINLOG_FNAMELEN + 1];
 | |
|         if (snprintf(next_binlog,
 | |
|                      sizeof(next_binlog),
 | |
|                      BINLOG_NAMEFMT,
 | |
|                      router->filestem.c_str(),
 | |
|                      blr_file_get_next_binlogname(router->binlog_name.c_str())) >= (int)sizeof(next_binlog))
 | |
|         {
 | |
|             MXS_ERROR("Next binlog name did not fit into the allocated buffer "
 | |
|                       "but was truncated, aborting: %s",
 | |
|                       next_binlog);
 | |
|             rval = AVRO_BINLOG_ERROR;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             MXS_INFO("End of binlog file [%s] at %lu. Rotating to next binlog file [%s].",
 | |
|                      router->binlog_name.c_str(),
 | |
|                      pos,
 | |
|                      next_binlog);
 | |
|             rval = AVRO_OK;
 | |
|             router->binlog_name = next_binlog;
 | |
|             router->current_pos = 4;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Rotate to a specific file
 | |
|  *
 | |
|  * This rotates the current binlog file being processed to a specific file.
 | |
|  * Currently this is only used to rotate to files that rotate events point to.
 | |
|  * @param router Avro router instance
 | |
|  * @param pos Current position, only used for logging
 | |
|  * @param next_binlog The next file to rotate to
 | |
|  */
 | |
| static void rotate_to_file(Avro* router, uint64_t pos, const char* next_binlog)
 | |
| {
 | |
|     MXS_NOTICE("End of binlog file [%s] at %lu. Rotating to file [%s].",
 | |
|                router->binlog_name.c_str(),
 | |
|                pos,
 | |
|                next_binlog);
 | |
|     router->binlog_name = next_binlog;
 | |
|     router->current_pos = 4;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Read the replication event payload
 | |
|  *
 | |
|  * @param router Avro router instance
 | |
|  * @param hdr Replication header
 | |
|  * @param pos Starting position of the event header
 | |
|  * @return The event data or NULL if an error occurred
 | |
|  */
 | |
| static GWBUF* read_event_data(Avro* router, REP_HEADER* hdr, uint64_t pos)
 | |
| {
 | |
|     GWBUF* result;
 | |
|     /* Allocate a GWBUF for the event */
 | |
|     if ((result = gwbuf_alloc(hdr->event_size - BINLOG_EVENT_HDR_LEN + 1)))
 | |
|     {
 | |
|         uint8_t* data = GWBUF_DATA(result);
 | |
|         int n = pread(router->binlog_fd,
 | |
|                       data,
 | |
|                       hdr->event_size - BINLOG_EVENT_HDR_LEN,
 | |
|                       pos + BINLOG_EVENT_HDR_LEN);
 | |
|         /** NULL-terminate for QUERY_EVENT processing */
 | |
|         data[hdr->event_size - BINLOG_EVENT_HDR_LEN] = '\0';
 | |
| 
 | |
|         if (n != static_cast<int>(hdr->event_size - BINLOG_EVENT_HDR_LEN))
 | |
|         {
 | |
|             if (n == -1)
 | |
|             {
 | |
|                 MXS_ERROR("Error reading the event at %lu in %s. "
 | |
|                           "%s, expected %d bytes.",
 | |
|                           pos,
 | |
|                           router->binlog_name.c_str(),
 | |
|                           mxs_strerror(errno),
 | |
|                           hdr->event_size - BINLOG_EVENT_HDR_LEN);
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 MXS_ERROR("Short read when reading the event at %lu in %s. "
 | |
|                           "Expected %d bytes got %d bytes.",
 | |
|                           pos,
 | |
|                           router->binlog_name.c_str(),
 | |
|                           hdr->event_size - BINLOG_EVENT_HDR_LEN,
 | |
|                           n);
 | |
|             }
 | |
|             gwbuf_free(result);
 | |
|             result = NULL;
 | |
|         }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         MXS_ERROR("Failed to allocate memory for binlog entry, "
 | |
|                   "size %d at %lu.",
 | |
|                   hdr->event_size,
 | |
|                   pos);
 | |
|     }
 | |
|     return result;
 | |
| }
 | |
| 
 | |
| bool notify_cb(DCB* dcb, void* data)
 | |
| {
 | |
|     SERVICE* service = static_cast<SERVICE*>(data);
 | |
| 
 | |
|     if (dcb->service == service && dcb->role == DCB::Role::CLIENT)
 | |
|     {
 | |
|         auto session = (AvroSession*)dcb->session->router_session;
 | |
|         session->queue_client_callback();
 | |
|     }
 | |
| 
 | |
|     return true;
 | |
| }
 | |
| 
 | |
| void notify_all_clients(SERVICE* service)
 | |
| {
 | |
|     auto worker = mxs::RoutingWorker::get(mxs::RoutingWorker::MAIN);
 | |
|     worker->execute([service]() {
 | |
|                         dcb_foreach(notify_cb, service);
 | |
|                     }, mxs::RoutingWorker::EXECUTE_AUTO);
 | |
| }
 | |
| 
 | |
| void do_checkpoint(Avro* router)
 | |
| {
 | |
|     router->handler.flush();
 | |
|     avro_save_conversion_state(router);
 | |
|     notify_all_clients(router->service);
 | |
|     router->row_count = router->trx_count = 0;
 | |
| }
 | |
| 
 | |
| bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_binlog_end_t* rc)
 | |
| {
 | |
|     uint8_t hdbuf[BINLOG_EVENT_HDR_LEN];
 | |
|     int n = pread(router->binlog_fd, hdbuf, BINLOG_EVENT_HDR_LEN, pos);
 | |
| 
 | |
|     /* Read the header information from the file */
 | |
|     if (n != BINLOG_EVENT_HDR_LEN)
 | |
|     {
 | |
|         switch (n)
 | |
|         {
 | |
|         case 0:
 | |
|             break;
 | |
| 
 | |
|         case -1:
 | |
|             MXS_ERROR("Failed to read binlog file %s at position %llu (%s).",
 | |
|                       router->binlog_name.c_str(),
 | |
|                       pos,
 | |
|                       mxs_strerror(errno));
 | |
|             break;
 | |
| 
 | |
|         default:
 | |
|             MXS_ERROR("Short read when reading the header. "
 | |
|                       "Expected 19 bytes but got %d bytes. "
 | |
|                       "Binlog file is %s, position %llu",
 | |
|                       n,
 | |
|                       router->binlog_name.c_str(),
 | |
|                       pos);
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|         router->current_pos = pos;
 | |
|         *rc = n == 0 ? AVRO_OK : AVRO_BINLOG_ERROR;
 | |
|         return false;
 | |
|     }
 | |
| 
 | |
|     bool rval = true;
 | |
| 
 | |
|     *hdr = construct_header(hdbuf);
 | |
| 
 | |
|     if (hdr->event_type > MAX_EVENT_TYPE_MARIADB10)
 | |
|     {
 | |
|         MXS_ERROR("Invalid MariaDB 10 event type 0x%x. Binlog file is %s, position %llu",
 | |
|                   hdr->event_type,
 | |
|                   router->binlog_name.c_str(),
 | |
|                   pos);
 | |
|         router->current_pos = pos;
 | |
|         *rc = AVRO_BINLOG_ERROR;
 | |
|         rval = false;
 | |
|     }
 | |
|     else if (hdr->event_size <= 0)
 | |
|     {
 | |
|         MXS_ERROR("Event size error: size %d at %llu.", hdr->event_size, pos);
 | |
|         router->current_pos = pos;
 | |
|         *rc = AVRO_BINLOG_ERROR;
 | |
|         rval = false;
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos)
 | |
| {
 | |
|     bool rval = false;
 | |
| 
 | |
|     if (hdr.next_pos > 0 && hdr.next_pos < pos)
 | |
|     {
 | |
|         MXS_INFO("Binlog %s: next pos %u < pos %lu, truncating to %lu",
 | |
|                  router->binlog_name.c_str(),
 | |
|                  hdr.next_pos,
 | |
|                  pos,
 | |
|                  pos);
 | |
|     }
 | |
|     else if (hdr.next_pos > 0 && hdr.next_pos != (pos + hdr.event_size))
 | |
|     {
 | |
|         MXS_INFO("Binlog %s: next pos %u != (pos %lu + event_size %u), truncating to %lu",
 | |
|                  router->binlog_name.c_str(),
 | |
|                  hdr.next_pos,
 | |
|                  pos,
 | |
|                  hdr.event_size,
 | |
|                  pos);
 | |
|     }
 | |
|     else if (hdr.next_pos > 0)
 | |
|     {
 | |
|         rval = true;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         MXS_ERROR("Current event type %d @ %lu has nex pos = %u : exiting",
 | |
|                   hdr.event_type,
 | |
|                   pos,
 | |
|                   hdr.next_pos);
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| bool read_fde(Avro* router)
 | |
| {
 | |
|     bool rval = false;
 | |
|     avro_binlog_end_t rc;
 | |
|     REP_HEADER hdr;
 | |
| 
 | |
|     if (read_header(router, 4, &hdr, &rc))
 | |
|     {
 | |
|         if (GWBUF* result = read_event_data(router, &hdr, 4))
 | |
|         {
 | |
|             router->handler.handle_event(hdr, GWBUF_DATA(result));
 | |
|             rval = true;
 | |
|         }
 | |
|     }
 | |
|     else if (rc == AVRO_OK)
 | |
|     {
 | |
|         // Empty file
 | |
|         rval = true;
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Read all replication events from a binlog file.
 | |
|  *
 | |
|  * Routine detects errors and pending transactions
 | |
|  *
 | |
|  * @param router        The router instance
 | |
|  * @param fix           Whether to fix or not errors
 | |
|  * @param debug         Whether to enable or not the debug for events
 | |
|  * @return              How the binlog was closed
 | |
|  * @see enum avro_binlog_end
 | |
|  */
 | |
| avro_binlog_end_t avro_read_all_events(Avro* router)
 | |
| {
 | |
|     mxs::WatchdogWorkaround workaround;
 | |
|     mxb_assert(router->binlog_fd != -1);
 | |
| 
 | |
|     if (!read_fde(router))
 | |
|     {
 | |
|         MXS_ERROR("Failed to read the FDE event from the binary log: %d, %s",
 | |
|                   errno,
 | |
|                   mxs_strerror(errno));
 | |
|         return AVRO_BINLOG_ERROR;
 | |
|     }
 | |
| 
 | |
|     uint64_t pos = router->current_pos;
 | |
|     std::string next_binlog;
 | |
|     bool rotate_seen = false;
 | |
| 
 | |
|     while (!maxscale_is_shutting_down())
 | |
|     {
 | |
|         avro_binlog_end_t rc;
 | |
|         REP_HEADER hdr;
 | |
| 
 | |
|         if (!read_header(router, pos, &hdr, &rc))
 | |
|         {
 | |
|             if (rc == AVRO_OK)
 | |
|             {
 | |
|                 do_checkpoint(router);
 | |
| 
 | |
|                 if (rotate_seen)
 | |
|                 {
 | |
|                     rotate_to_file(router, pos, next_binlog.c_str());
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     rc = rotate_to_next_file_if_exists(router, pos);
 | |
|                 }
 | |
|             }
 | |
|             return rc;
 | |
|         }
 | |
| 
 | |
|         GWBUF* result = read_event_data(router, &hdr, pos);
 | |
| 
 | |
|         if (result == NULL)
 | |
|         {
 | |
|             router->current_pos = pos;
 | |
|             return AVRO_BINLOG_ERROR;
 | |
|         }
 | |
| 
 | |
|         /* get event content */
 | |
|         uint8_t* ptr = GWBUF_DATA(result);
 | |
| 
 | |
|         // These events are only related to binary log files
 | |
|         if (hdr.event_type == ROTATE_EVENT)
 | |
|         {
 | |
|             int len = hdr.event_size - BINLOG_EVENT_HDR_LEN - 8 - (router->handler.have_checksums() ? 4 : 0);
 | |
|             next_binlog.assign((char*)ptr + 8, len);
 | |
|             rotate_seen = true;
 | |
|         }
 | |
|         else if (hdr.event_type == MARIADB_ANNOTATE_ROWS_EVENT)
 | |
|         {
 | |
|             // This appears to need special handling
 | |
|             int annotate_len = hdr.event_size - BINLOG_EVENT_HDR_LEN
 | |
|                 - (router->handler.have_checksums() ? 4 : 0);
 | |
|             MXS_INFO("Annotate_rows_event: %.*s", annotate_len, ptr);
 | |
|             pos += hdr.event_size;
 | |
|             router->current_pos = pos;
 | |
|             gwbuf_free(result);
 | |
|             continue;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1)
 | |
|                 || (hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2))
 | |
|             {
 | |
|                 router->row_count++;
 | |
|             }
 | |
|             else if (hdr.event_type == XID_EVENT)
 | |
|             {
 | |
|                 router->trx_count++;
 | |
|             }
 | |
| 
 | |
|             router->handler.handle_event(hdr, ptr);
 | |
|         }
 | |
| 
 | |
|         gwbuf_free(result);
 | |
| 
 | |
|         if (router->row_count >= router->row_target
 | |
|             || router->trx_count >= router->trx_target)
 | |
|         {
 | |
|             do_checkpoint(router);
 | |
|         }
 | |
| 
 | |
|         if (pos_is_ok(router, hdr, pos))
 | |
|         {
 | |
|             pos = hdr.next_pos;
 | |
|             router->current_pos = pos;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             break;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     return AVRO_BINLOG_ERROR;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Read the field names from the stored Avro schemas
 | |
|  *
 | |
|  * @param router Router instance
 | |
|  */
 | |
| void avro_load_metadata_from_schemas(Avro* router)
 | |
| {
 | |
|     char path[PATH_MAX + 1];
 | |
|     snprintf(path, sizeof(path), "%s/*.avsc", router->avrodir.c_str());
 | |
|     glob_t files;
 | |
| 
 | |
|     if (glob(path, 0, NULL, &files) != GLOB_NOMATCH)
 | |
|     {
 | |
|         char db[MYSQL_DATABASE_MAXLEN + 1], table[MYSQL_TABLE_MAXLEN + 1];
 | |
|         char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
 | |
|         int version = 0;
 | |
| 
 | |
|         /** Glob sorts the files in ascending order which means that processing
 | |
|          * them in reverse should give us the newest schema first. */
 | |
|         for (int i = files.gl_pathc - 1; i > -1; i--)
 | |
|         {
 | |
|             char* dbstart = strrchr(files.gl_pathv[i], '/');
 | |
| 
 | |
|             if (!dbstart)
 | |
|             {
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             dbstart++;
 | |
| 
 | |
|             char* tablestart = strchr(dbstart, '.');
 | |
| 
 | |
|             if (!tablestart)
 | |
|             {
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             snprintf(db, sizeof(db), "%.*s", (int)(tablestart - dbstart), dbstart);
 | |
|             tablestart++;
 | |
| 
 | |
|             char* versionstart = strchr(tablestart, '.');
 | |
| 
 | |
|             if (!versionstart)
 | |
|             {
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             snprintf(table, sizeof(table), "%.*s", (int)(versionstart - tablestart), tablestart);
 | |
|             versionstart++;
 | |
| 
 | |
|             char* suffix = strchr(versionstart, '.');
 | |
|             char* versionend = NULL;
 | |
|             version = strtol(versionstart, &versionend, 10);
 | |
| 
 | |
|             if (versionend == suffix)
 | |
|             {
 | |
|                 snprintf(table_ident, sizeof(table_ident), "%s.%s", db, table);
 | |
|                 STableCreateEvent created(table_create_from_schema(files.gl_pathv[i],
 | |
|                                                                    db,
 | |
|                                                                    table,
 | |
|                                                                    version));
 | |
|                 router->handler.add_create(created);
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 MXS_ERROR("Malformed schema file name: %s", files.gl_pathv[i]);
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     globfree(&files);
 | |
| }
 | 
