 43c33e9f4a
			
		
	
	43c33e9f4a
	
	
	
		
			
			Allocating the session before a DCB guarantees that at no point will a DCB have a null session. This further clarifies the concept of the session and also allows the listener reference to be moved there. Ideally, the session itself would allocate and assign the client DCB but since the Listener is the only one who does it, it's acceptable for now.
		
			
				
	
	
		
			3274 lines
		
	
	
		
			101 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			3274 lines
		
	
	
		
			101 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|  * lint -e662
 | |
|  * lint -e661
 | |
|  */
 | |
| 
 | |
| /*
 | |
|  * Copyright (c) 2016 MariaDB Corporation Ab
 | |
|  *
 | |
|  * Use of this software is governed by the Business Source License included
 | |
|  * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | |
|  *
 | |
|  * Change Date: 2022-01-01
 | |
|  *
 | |
|  * On the date above, in accordance with the Business Source License, use
 | |
|  * of this software will be governed by version 2 or later of the General
 | |
|  * Public License.
 | |
|  */
 | |
| 
 | |
| /**
 | |
|  * @file blr_master.c - contains code for the router to master communication
 | |
|  *
 | |
|  * The binlog router is designed to be used in replication environments to
 | |
|  * increase the replication fanout of a master server. It provides a transparant
 | |
|  * mechanism to read the binlog entries for multiple slaves while requiring
 | |
|  * only a single connection to the actual master to support the slaves.
 | |
|  *
 | |
|  * The current prototype implement is designed to support MySQL 5.6 and has
 | |
|  * a number of limitations. This prototype is merely a proof of concept and
 | |
|  * should not be considered production ready.
 | |
|  */
 | |
| 
 | |
| #include "blr.hh"
 | |
| 
 | |
| #include <inttypes.h>
 | |
| #include <stdio.h>
 | |
| #include <stdlib.h>
 | |
| #include <string.h>
 | |
| #include <sys/socket.h>
 | |
| #include <sys/types.h>
 | |
| 
 | |
| #include <sstream>
 | |
| 
 | |
| #include <maxscale/alloc.h>
 | |
| #include <maxbase/atomic.h>
 | |
| #include <maxscale/buffer.h>
 | |
| #include <maxscale/dcb.hh>
 | |
| #include <maxscale/housekeeper.h>
 | |
| #include <maxscale/log.h>
 | |
| #include <maxscale/protocol/mysql.h>
 | |
| #include <maxscale/router.hh>
 | |
| #include <maxscale/routingworker.h>
 | |
| #include <maxbase/worker.hh>
 | |
| #include <maxscale/server.hh>
 | |
| #include <maxscale/service.hh>
 | |
| #include <maxscale/session.hh>
 | |
| #include <maxscale/utils.h>
 | |
| 
 | |
| #include "../../../core/internal/session.hh"
 | |
| 
 | |
| static GWBUF* blr_make_query(DCB* dcb, char* query);
 | |
| static GWBUF* blr_make_registration(ROUTER_INSTANCE* router);
 | |
| static GWBUF* blr_make_binlog_dump(ROUTER_INSTANCE* router);
 | |
| void          encode_value(unsigned char* data, unsigned int value, int len);
 | |
| void          blr_handle_binlog_record(ROUTER_INSTANCE* router, GWBUF* pkt);
 | |
| static void*  CreateMySQLAuthData(const char* username,
 | |
|                                   const char* password,
 | |
|                                   const char* database);
 | |
| void        blr_extract_header(uint8_t* pkt, REP_HEADER* hdr);
 | |
| static void blr_log_packet(int priority, const char* msg, uint8_t* ptr, int len);
 | |
| char*       blr_extract_column(GWBUF* buf, int col);
 | |
| static bool blr_check_last_master_event(void* inst);
 | |
| extern int  blr_check_heartbeat(ROUTER_INSTANCE* router);
 | |
| static void blr_log_identity(ROUTER_INSTANCE* router);
 | |
| static void blr_extract_header_semisync(uint8_t* pkt, REP_HEADER* hdr);
 | |
| static int  blr_get_master_semisync(GWBUF* buf);
 | |
| static void blr_terminate_master_replication(ROUTER_INSTANCE* router,
 | |
|                                              uint8_t* ptr,
 | |
|                                              int len);
 | |
| extern bool blr_notify_waiting_slave(ROUTER_SLAVE* slave);
 | |
| extern bool blr_save_mariadb_gtid(ROUTER_INSTANCE* inst);
 | |
| static void blr_register_serverid(ROUTER_INSTANCE* router, GWBUF* buf);
 | |
| static bool blr_register_heartbeat(ROUTER_INSTANCE* router, GWBUF* buf);
 | |
| static void blr_register_setchecksum(ROUTER_INSTANCE* router, GWBUF* buf);
 | |
| static void blr_register_getchecksum(ROUTER_INSTANCE* router, GWBUF* buf);
 | |
| static void blr_register_handle_checksum(ROUTER_INSTANCE* router, GWBUF* buf);
 | |
| static void blr_register_serveruuid(ROUTER_INSTANCE* router, GWBUF* buf);
 | |
| static void blr_register_slaveuuid(ROUTER_INSTANCE* router, GWBUF* buf);
 | |
| static void blr_register_utf8(ROUTER_INSTANCE* router, GWBUF* buf);
 | |
| static void blr_register_selectversion(ROUTER_INSTANCE* router, GWBUF* buf);
 | |
| static void blr_register_selectvercomment(ROUTER_INSTANCE* router, GWBUF* buf);
 | |
| static void blr_register_selecthostname(ROUTER_INSTANCE* router, GWBUF* buf);
 | |
| static void blr_register_selectmap(ROUTER_INSTANCE* router, GWBUF* buf);
 | |
| static void blr_register_mxw_binlogvars(ROUTER_INSTANCE* router, GWBUF* buf);
 | |
| static void blr_register_mxw_tables(ROUTER_INSTANCE* router, GWBUF* buf);
 | |
| static void blr_register_getsemisync(ROUTER_INSTANCE* router, GWBUF* buf);
 | |
| static bool blr_register_setsemisync(ROUTER_INSTANCE* router, GWBUF* buf);
 | |
| static void blr_register_mxw_handlelowercase(ROUTER_INSTANCE* router,
 | |
|                                              GWBUF* buf);
 | |
| static void blr_register_send_command(ROUTER_INSTANCE* router,
 | |
|                                       const char* command,
 | |
|                                       unsigned int state);
 | |
| static void blr_register_cache_response(ROUTER_INSTANCE* router,
 | |
|                                         GWBUF** save_buf,
 | |
|                                         const char* save_tag,
 | |
|                                         GWBUF* in_buf);
 | |
| static void blr_start_master_registration(ROUTER_INSTANCE* router, GWBUF* buf);
 | |
| static void blr_register_mariadb_gtid_request(ROUTER_INSTANCE* router,
 | |
|                                               GWBUF* buf);
 | |
| extern int blr_write_special_event(ROUTER_INSTANCE* router,
 | |
|                                    uint32_t file_offset,
 | |
|                                    uint32_t hole_size,
 | |
|                                    REP_HEADER* hdr,
 | |
|                                    int type);
 | |
| extern int  blr_file_new_binlog(ROUTER_INSTANCE* router, char* file);
 | |
| static bool blr_handle_missing_files(ROUTER_INSTANCE* router,
 | |
|                                      char* new_file);
 | |
| extern void blr_file_update_gtid(ROUTER_INSTANCE* router);
 | |
| static int  blr_check_connect_retry(ROUTER_INSTANCE* router);
 | |
| 
 | |
| static int keepalive = 1;
 | |
| 
 | |
| /**
 | |
|  * blr_start_master - controls the connection of the binlog router to the
 | |
|  * master MySQL server and triggers the slave registration process for
 | |
|  * the router.
 | |
|  *
 | |
|  * @param   router      The router instance
 | |
|  */
 | |
| static void blr_start_master(void* data)
 | |
| {
 | |
|     ROUTER_INSTANCE* router = (ROUTER_INSTANCE*)data;
 | |
|     mxb_assert(mxs_rworker_get_current() == mxs_rworker_get(MXS_RWORKER_MAIN));
 | |
| 
 | |
|     if (router->client)
 | |
|     {
 | |
|         MXS_FREE(router->client->data);
 | |
|         router->client->data = NULL;
 | |
|         dcb_close(router->client);
 | |
|         router->client = NULL;
 | |
|     }
 | |
| 
 | |
|     router->stats.n_binlogs_ses = 0;
 | |
|     pthread_mutex_lock(&router->lock);
 | |
| 
 | |
|     if (router->master_state != BLRM_UNCONNECTED)
 | |
|     {
 | |
|         if (router->master_state != BLRM_SLAVE_STOPPED
 | |
|             && router->master_state != BLRM_CONNECTING)
 | |
|         {
 | |
|             MXS_ERROR("%s: Master Connect: Unexpected master state [%s]\n",
 | |
|                       router->service->name,
 | |
|                       blrm_states[router->master_state]);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             MXS_NOTICE("%s: Master Connect: binlog current state is [%s]\n",
 | |
|                        router->service->name,
 | |
|                        blrm_states[router->master_state]);
 | |
|         }
 | |
| 
 | |
|         /* Return only if state is not BLRM_CONNECTING */
 | |
|         if (router->master_state != BLRM_CONNECTING)
 | |
|         {
 | |
|             pthread_mutex_unlock(&router->lock);
 | |
|             return;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     /* Check whether master connection can be started */
 | |
|     int connect_retry;
 | |
|     if ((connect_retry = blr_check_connect_retry(router)) == -1)
 | |
|     {
 | |
|         /* Force stopped state */
 | |
|         router->master_state = BLRM_SLAVE_STOPPED;
 | |
|         pthread_mutex_unlock(&router->lock);
 | |
| 
 | |
|         MXS_ERROR("%s: failure while connecting to master server '%s', "
 | |
|                   "reached %d maximum number of retries. "
 | |
|                   "Replication is stopped.",
 | |
|                   router->service->name,
 | |
|                   router->service->dbref->server->name,
 | |
|                   router->retry_limit);
 | |
|         return;
 | |
|     }
 | |
| 
 | |
|     /* Force connecting state */
 | |
|     router->master_state = BLRM_CONNECTING;
 | |
| 
 | |
|     pthread_mutex_unlock(&router->lock);
 | |
| 
 | |
|     // TODO: Fix this
 | |
|     DCB* client = dcb_alloc(DCB_ROLE_INTERNAL, NULL);
 | |
| 
 | |
|     /* Create fake 'client' DCB */
 | |
|     if (client == NULL)
 | |
|     {
 | |
|         MXS_ERROR("failed to create DCB for dummy client");
 | |
|         return;
 | |
|     }
 | |
|     router->client = client;
 | |
| 
 | |
|     /* Fake the client is reading */
 | |
|     client->state = DCB_STATE_POLLING;      /* Fake the client is reading */
 | |
| 
 | |
|     /* Create MySQL Athentication from configured user/passwd */
 | |
|     client->data = CreateMySQLAuthData(router->user, router->password, "");
 | |
|     // TODO: Fix this
 | |
|     client->session = new mxs::Session(nullptr);
 | |
|     router->session = client->session;
 | |
| 
 | |
|     /* Create a session for dummy client DCB */
 | |
|     if (router->session == NULL || session_start(router->session))
 | |
|     {
 | |
|         MXS_ERROR("failed to create session for connection to master");
 | |
|         return;
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * 'client' is the fake DCB that emulates a client session:
 | |
|      * we need to set the poll.thread.id for the "dummy client"
 | |
|      */
 | |
|     client->session->client_dcb->owner = mxs_rworker_get_current();
 | |
| 
 | |
|     /* Connect to configured master server */
 | |
|     if ((router->master = dcb_connect(router->service->dbref->server,
 | |
|                                       router->session,
 | |
|                                       BLR_PROTOCOL)) == NULL)
 | |
|     {
 | |
|         pthread_mutex_lock(&router->lock);
 | |
|         router->retry_count++;
 | |
|         pthread_mutex_unlock(&router->lock);
 | |
| 
 | |
|         blr_start_master_in_main(router, connect_retry);
 | |
| 
 | |
|         MXS_ERROR("%s: failure while connecting to master server '%s', "
 | |
|                   "retrying in %d seconds",
 | |
|                   router->service->name,
 | |
|                   router->service->dbref->server->name,
 | |
|                   connect_retry);
 | |
|         return;
 | |
|     }
 | |
|     router->master->remote = MXS_STRDUP_A(router->service->dbref->server->address);
 | |
| 
 | |
|     MXS_NOTICE("%s: attempting to connect to master"
 | |
|                " server [%s]:%d, binlog='%s', pos=%lu%s%s",
 | |
|                router->service->name,
 | |
|                router->service->dbref->server->address,
 | |
|                router->service->dbref->server->port,
 | |
|                router->binlog_name,
 | |
|                router->current_pos,
 | |
|                router->mariadb10_master_gtid ? ", GTID=" : "",
 | |
|                router->mariadb10_master_gtid ? router->last_mariadb_gtid : "");
 | |
| 
 | |
|     router->connect_time = time(0);
 | |
| 
 | |
|     if (setsockopt(router->master->fd,
 | |
|                    SOL_SOCKET,
 | |
|                    SO_KEEPALIVE,
 | |
|                    &keepalive,
 | |
|                    sizeof(keepalive)))
 | |
|     {
 | |
|         perror("setsockopt");
 | |
|     }
 | |
| 
 | |
|     router->master_state = BLRM_AUTHENTICATED;
 | |
| 
 | |
|     /**
 | |
|      * Start the slave protocol registration phase.
 | |
|      * This is the first step: SELECT UNIX_TIMESTAMP()
 | |
|      *
 | |
|      * Next states are handled by blr_master_response()
 | |
|      */
 | |
| 
 | |
|     blr_register_send_command(router,
 | |
|                               "SELECT UNIX_TIMESTAMP()",
 | |
|                               BLRM_TIMESTAMP);
 | |
| 
 | |
|     router->stats.n_masterstarts++;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Callback start function to be called in the context of the main worker.
 | |
|  *
 | |
|  * @param worker  The worker in whose context the function is called.
 | |
|  * @param data    The data to be passed to `blr_start_master`
 | |
|  */
 | |
| static bool worker_cb_start_master(mxb::Worker::Call::action_t, ROUTER_INSTANCE* data)
 | |
| {
 | |
|     mxb_assert_message(mxs_rworker_get_current() == mxs_rworker_get(MXS_RWORKER_MAIN),
 | |
|                        "worker_cb_start_master must be called from the main thread");
 | |
|     blr_start_master(data);
 | |
|     return false;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Start master in the main Worker.
 | |
|  *
 | |
|  * @param data  Data intended for `blr_start_master`.
 | |
|  */
 | |
| bool blr_start_master_in_main(ROUTER_INSTANCE* data, int32_t delay)
 | |
| {
 | |
|     // The master should be connected to in the main worker, so we post it a
 | |
|     // message and call `blr_start_master` there.
 | |
| 
 | |
|     mxb::Worker* worker = (mxb::Worker*)mxs_rworker_get(MXS_RWORKER_MAIN);
 | |
|     mxb_assert(worker);
 | |
| 
 | |
|     worker->delayed_call(delay == 0 ? 1 : delay * 1000, worker_cb_start_master, data);
 | |
| 
 | |
|     return false;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Callback close function to be called in the context of the main worker.
 | |
|  *
 | |
|  * @param worker_id  The id of the worker in whose context the function is called.
 | |
|  * @param data       The data to be passed to `blr_start_master`
 | |
|  */
 | |
| static void worker_cb_close_master(MXB_WORKER* worker, void* data)
 | |
| {
 | |
|     // This is itended to be called only in the main worker.
 | |
|     mxb_assert(worker == mxs_rworker_get(MXS_RWORKER_MAIN));
 | |
| 
 | |
|     blr_master_close(static_cast<ROUTER_INSTANCE*>(data));
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Close master connection in the main Worker.
 | |
|  *
 | |
|  * @param data  Data intended for `blr_master_close`.
 | |
|  */
 | |
| void blr_close_master_in_main(void* data)
 | |
| {
 | |
|     // The master should be connected to in the main worker, so we post it a
 | |
|     // message and call `blr_master_close` there.
 | |
| 
 | |
|     MXB_WORKER* worker = mxs_rworker_get(MXS_RWORKER_MAIN);     // The worker running in the main thread.
 | |
|     mxb_assert(worker);
 | |
| 
 | |
|     intptr_t arg1 = (intptr_t)worker_cb_close_master;
 | |
|     intptr_t arg2 = (intptr_t)data;
 | |
| 
 | |
|     if (!mxb_worker_post_message(worker, MXB_WORKER_MSG_CALL, arg1, arg2))
 | |
|     {
 | |
|         MXS_ERROR("Could not post 'blr_master_close' message to main worker.");
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Reconnect to the master server.
 | |
|  *
 | |
|  * IMPORTANT - must be called with router->active_logs set by the
 | |
|  * thread that set active_logs.
 | |
|  *
 | |
|  * @param   router      The router instance
 | |
|  */
 | |
| static void blr_restart_master(ROUTER_INSTANCE* router)
 | |
| {
 | |
|     /* Now it is safe to unleash other threads on this router instance */
 | |
|     pthread_mutex_lock(&router->lock);
 | |
|     router->reconnect_pending = 0;
 | |
|     router->active_logs = 0;
 | |
| 
 | |
|     if (router->master_state < BLRM_BINLOGDUMP)
 | |
|     {
 | |
|         int connect_retry;
 | |
|         if ((connect_retry = blr_check_connect_retry(router)) == -1)
 | |
|         {
 | |
|             /* Force stopped state */
 | |
|             router->master_state = BLRM_SLAVE_STOPPED;
 | |
|             pthread_mutex_unlock(&router->lock);
 | |
| 
 | |
|             MXS_ERROR("%s: failed to connect to master server '%s', "
 | |
|                       "reached %d maximum number of retries. "
 | |
|                       "Replication is stopped.",
 | |
|                       router->service->name,
 | |
|                       router->service->dbref->server->name,
 | |
|                       router->retry_limit);
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         /* Force unconnected state */
 | |
|         router->master_state = BLRM_UNCONNECTED;
 | |
|         router->retry_count++;
 | |
| 
 | |
|         int config_index = (router->config_index + 1) % router->configs.size();
 | |
|         if (config_index != router->config_index)   // Will be different unless there is but one.
 | |
|         {
 | |
|             mxb_assert(config_index < static_cast<int>(router->configs.size()));
 | |
| 
 | |
|             const ChangeMasterConfig& old_config = router->configs[router->config_index];
 | |
|             router->config_index = config_index;
 | |
|             const ChangeMasterConfig& new_config = router->configs[router->config_index];
 | |
| 
 | |
|             blr_master_set_config(router, new_config);
 | |
| 
 | |
|             MXS_NOTICE("Connection to %s:%d failed, now trying with %s:%d.",
 | |
|                        old_config.host.c_str(),
 | |
|                        old_config.port,
 | |
|                        new_config.host.c_str(),
 | |
|                        new_config.port);
 | |
|         }
 | |
| 
 | |
|         pthread_mutex_unlock(&router->lock);
 | |
| 
 | |
|         blr_start_master_in_main(router, connect_retry);
 | |
| 
 | |
|         MXS_ERROR("%s: failed to connect to master server '%s', "
 | |
|                   "retrying in %d seconds",
 | |
|                   router->service->name,
 | |
|                   router->service->dbref->server->name,
 | |
|                   connect_retry);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         /* Force connecting state */
 | |
|         router->master_state = BLRM_CONNECTING;
 | |
|         pthread_mutex_unlock(&router->lock);
 | |
| 
 | |
|         blr_start_master_in_main(router);
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Request a reconnect to the master.
 | |
|  *
 | |
|  * If another thread is active processing messages from the master
 | |
|  * then merely set a flag for that thread to do the restart. If no
 | |
|  * threads are active then directly call the restart routine to
 | |
|  * reconnect to the master.
 | |
|  *
 | |
|  * @param   router      The router instance
 | |
|  */
 | |
| void blr_master_reconnect(ROUTER_INSTANCE* router)
 | |
| {
 | |
|     int do_reconnect = 0;
 | |
| 
 | |
|     if (router->master_state == BLRM_SLAVE_STOPPED)
 | |
|     {
 | |
|         return;
 | |
|     }
 | |
| 
 | |
|     pthread_mutex_lock(&router->lock);
 | |
|     if (router->active_logs)
 | |
|     {
 | |
|         /* Currently processing a response, set a flag
 | |
|          * and get the thread that is process a response
 | |
|          * to deal with the reconnect.
 | |
|          */
 | |
|         router->reconnect_pending = 1;
 | |
|         router->stats.n_delayedreconnects++;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         router->active_logs = 1;
 | |
|         do_reconnect = 1;
 | |
|     }
 | |
|     pthread_mutex_unlock(&router->lock);
 | |
|     if (do_reconnect)
 | |
|     {
 | |
|         blr_restart_master(router);
 | |
|         pthread_mutex_lock(&router->lock);
 | |
|         router->active_logs = 0;
 | |
|         pthread_mutex_unlock(&router->lock);
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Shutdown a connection to the master
 | |
|  *
 | |
|  * @param router    The router instance
 | |
|  */
 | |
| void blr_master_close(ROUTER_INSTANCE* router)
 | |
| {
 | |
|     dcb_close(router->master);
 | |
|     router->master = NULL;
 | |
| 
 | |
|     pthread_mutex_lock(&router->lock);
 | |
|     if (router->master_state != BLRM_SLAVE_STOPPED)
 | |
|     {
 | |
|         router->master_state = BLRM_UNCONNECTED;
 | |
|     }
 | |
|     router->master_event_state = BLR_EVENT_DONE;
 | |
|     pthread_mutex_unlock(&router->lock);
 | |
| 
 | |
|     gwbuf_free(router->stored_event);
 | |
|     router->stored_event = NULL;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Binlog router master side state machine event handler.
 | |
|  *
 | |
|  * Handles an incoming response from the master server to the binlog
 | |
|  * router.
 | |
|  *
 | |
|  * @param router    The router instance
 | |
|  * @param buf       The incoming packet
 | |
|  */
 | |
| void blr_master_response(ROUTER_INSTANCE* router, GWBUF* buf)
 | |
| {
 | |
|     atomic_add(&router->handling_threads, 1);
 | |
|     mxb_assert(router->handling_threads == 1);
 | |
|     pthread_mutex_lock(&router->lock);
 | |
|     router->active_logs = 1;
 | |
|     pthread_mutex_unlock(&router->lock);
 | |
| 
 | |
|     if (router->master_state > BLRM_MAXSTATE)
 | |
|     {
 | |
|         MXS_ERROR("Invalid master state machine state (%d) for binlog router.",
 | |
|                   router->master_state);
 | |
|         gwbuf_free(buf);
 | |
| 
 | |
|         pthread_mutex_lock(&router->lock);
 | |
|         if (router->reconnect_pending)
 | |
|         {
 | |
|             router->active_logs = 0;
 | |
|             pthread_mutex_unlock(&router->lock);
 | |
|             atomic_add(&router->handling_threads, -1);
 | |
|             MXS_ERROR("%s: Pending reconnect in state %s.",
 | |
|                       router->service->name,
 | |
|                       blrm_states[router->master_state]);
 | |
|             blr_restart_master(router);
 | |
|             return;
 | |
|         }
 | |
|         router->active_logs = 0;
 | |
|         pthread_mutex_unlock(&router->lock);
 | |
|         atomic_add(&router->handling_threads, -1);
 | |
|         return;
 | |
|     }
 | |
| 
 | |
|     if (router->master_state == BLRM_GTIDMODE && MYSQL_RESPONSE_ERR(buf))
 | |
|     {
 | |
|         /*
 | |
|          * If we get an error response to the GTID Mode then we
 | |
|          * asusme the server does not support GTID modes and
 | |
|          * continue. The error is saved and replayed to slaves if
 | |
|          * they also request the GTID mode.
 | |
|          */
 | |
|         MXS_ERROR("%s: Master server does not support GTID Mode.",
 | |
|                   router->service->name);
 | |
|     }
 | |
|     else if (router->master_state != BLRM_BINLOGDUMP && MYSQL_RESPONSE_ERR(buf))
 | |
|     {
 | |
|         char* msg_err = NULL;
 | |
|         int msg_len = 0;
 | |
|         int len = gwbuf_length(buf);
 | |
|         unsigned long mysql_errno = extract_field(MYSQL_ERROR_CODE(buf), 16);
 | |
| 
 | |
|         msg_len = len - 7 - 6;      // +7 is where msg starts, 6 is skipped the status message (#42000)
 | |
|         msg_err = (char*)MXS_MALLOC(msg_len + 1);
 | |
| 
 | |
|         if (msg_err)
 | |
|         {
 | |
|             // skip status message only as MYSQL_RESPONSE_ERR(buf) points to GWBUF_DATA(buf) +7
 | |
|             memcpy(msg_err, (char*)(MYSQL_ERROR_MSG(buf) + 6), msg_len);
 | |
| 
 | |
|             /* NULL terminated error string */
 | |
|             *(msg_err + msg_len) = '\0';
 | |
|         }
 | |
| 
 | |
|         MXS_ERROR("%s: Received error: %lu, '%s' from master during '%s' phase "
 | |
|                   "of the master state machine.",
 | |
|                   router->service->name,
 | |
|                   mysql_errno,
 | |
|                   msg_err ? msg_err : "(memory failure)",
 | |
|                   blrm_states[router->master_state]);
 | |
|         gwbuf_free(buf);
 | |
| 
 | |
|         pthread_mutex_lock(&router->lock);
 | |
| 
 | |
|         /* set mysql errno */
 | |
|         router->m_errno = mysql_errno;
 | |
| 
 | |
|         /* set mysql error message */
 | |
|         if (router->m_errmsg)
 | |
|         {
 | |
|             MXS_FREE(router->m_errmsg);
 | |
|         }
 | |
|         router->m_errmsg = msg_err ? msg_err : MXS_STRDUP("(memory failure)");
 | |
| 
 | |
|         router->active_logs = 0;
 | |
|         if (router->reconnect_pending)
 | |
|         {
 | |
|             pthread_mutex_unlock(&router->lock);
 | |
|             atomic_add(&router->handling_threads, -1);
 | |
|             blr_restart_master(router);
 | |
|             return;
 | |
|         }
 | |
|         pthread_mutex_unlock(&router->lock);
 | |
|         atomic_add(&router->handling_threads, -1);
 | |
|         return;
 | |
|     }
 | |
| 
 | |
|     // Start the Slave Protocol registration with Master server
 | |
|     blr_start_master_registration(router, buf);
 | |
| 
 | |
|     // Check whether re-connect to master is needed
 | |
|     if (router->reconnect_pending)
 | |
|     {
 | |
|         blr_restart_master(router);
 | |
|     }
 | |
| 
 | |
|     pthread_mutex_lock(&router->lock);
 | |
|     router->active_logs = 0;
 | |
|     pthread_mutex_unlock(&router->lock);
 | |
|     atomic_add(&router->handling_threads, -1);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Build a MySQL query into a GWBUF that we can send to the master database
 | |
|  *
 | |
|  * The data is not written to @c dcb but the expected protocol state is fixed.
 | |
|  *
 | |
|  * @param   dcb         The DCB where this will be written
 | |
|  * @param   query       The text of the query to send
 | |
|  */
 | |
| static GWBUF* blr_make_query(DCB* dcb, char* query)
 | |
| {
 | |
|     GWBUF* buf;
 | |
|     unsigned char* data;
 | |
|     int len;
 | |
| 
 | |
|     if ((buf = gwbuf_alloc(strlen(query) + MYSQL_HEADER_LEN + 1)) == NULL)
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
|     data = GWBUF_DATA(buf);
 | |
|     len = strlen(query) + 1;
 | |
|     encode_value(&data[0], len, 24);// Payload length
 | |
|     data[3] = 0;                    // Sequence id
 | |
|     // Payload
 | |
|     data[4] = COM_QUERY;            // Command
 | |
|     memcpy(&data[5], query, strlen(query));
 | |
| 
 | |
|     // This is hack to get the result set processing in order for binlogrouter
 | |
|     MySQLProtocol* proto = (MySQLProtocol*)dcb->protocol;
 | |
|     proto->current_command = MXS_COM_QUERY;
 | |
| 
 | |
|     return buf;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Build a MySQL slave registration into a GWBUF that we can send to the
 | |
|  * master database
 | |
|  *
 | |
|  * @param   router  The router instance
 | |
|  * @return  A MySQL Replication registration message in a GWBUF structure
 | |
|  */
 | |
| static GWBUF* blr_make_registration(ROUTER_INSTANCE* router)
 | |
| {
 | |
|     GWBUF* buf;
 | |
|     unsigned char* data;
 | |
|     int len = 18;   // Min size of COM_REGISTER_SLAVE payload
 | |
|     int port = 3306;
 | |
|     int hostname_len = 0;
 | |
| 
 | |
|     // Send router->set_slave_hostname
 | |
|     if (router->set_slave_hostname && router->set_slave_hostname[0])
 | |
|     {
 | |
|         hostname_len = strlen(router->set_slave_hostname);
 | |
|     }
 | |
| 
 | |
|     // Add hostname len
 | |
|     len += hostname_len;
 | |
| 
 | |
|     if ((buf = gwbuf_alloc(len + MYSQL_HEADER_LEN)) == NULL)
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     data = GWBUF_DATA(buf);
 | |
|     encode_value(&data[0], len, 24);                // Payload length
 | |
|     data[3] = 0;                                    // Sequence ID
 | |
|     data[4] = COM_REGISTER_SLAVE;                   // Command
 | |
|     encode_value(&data[5], router->serverid, 32);   // Slave Server ID
 | |
| 
 | |
|     // Point to hostname len offset
 | |
|     data += 9;
 | |
| 
 | |
|     *data++ = hostname_len;                 // Slave hostname length
 | |
| 
 | |
|     // Copy hostname
 | |
|     if (hostname_len)
 | |
|     {
 | |
|         memcpy(data, router->set_slave_hostname, hostname_len);
 | |
|     }
 | |
| 
 | |
|     // Point to user
 | |
|     data += hostname_len;
 | |
|     // Set empty user
 | |
|     *data++ = 0;            // Slave username length
 | |
|     // Set empty password
 | |
|     *data++ = 0;            // Slave password length
 | |
|     // Add port
 | |
|     encode_value(&data[0], port, 16);               // Slave master port, 2 bytes
 | |
|     encode_value(&data[2], 0, 32);                  // Replication rank, 4 bytes
 | |
|     encode_value(&data[6], router->masterid, 32);   // Master server-id, 4 bytes
 | |
| 
 | |
|     // This is hack to get the result set processing in order for binlogrouter
 | |
|     MySQLProtocol* proto = (MySQLProtocol*)router->master->protocol;
 | |
|     proto->current_command = MXS_COM_REGISTER_SLAVE;
 | |
| 
 | |
|     return buf;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|  * Build a Binlog dump command into a GWBUF that we can send to the
 | |
|  * master database
 | |
|  *
 | |
|  * @param   router  The router instance
 | |
|  * @return  A MySQL Replication COM_BINLOG_DUMP message in a GWBUF structure
 | |
|  */
 | |
| static GWBUF* blr_make_binlog_dump(ROUTER_INSTANCE* router)
 | |
| {
 | |
|     GWBUF* buf;
 | |
|     unsigned char* data;
 | |
|     int binlog_file_len = strlen(router->binlog_name);
 | |
|     /* COM_BINLOG_DUMP needs 11 bytes + binlogname (terminating NULL is not required) */
 | |
|     int len = 11 + binlog_file_len;
 | |
| 
 | |
|     if ((buf = gwbuf_alloc(len + MYSQL_HEADER_LEN)) == NULL)
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
|     data = GWBUF_DATA(buf);
 | |
| 
 | |
|     encode_value(&data[0], len, 24);        // Payload length
 | |
|     data[3] = 0;                            // Sequence ID
 | |
|     data[4] = COM_BINLOG_DUMP;              // Command
 | |
|     encode_value(&data[5],
 | |
|                  router->current_pos,
 | |
|                  32);                       // binlog position
 | |
| 
 | |
|     /* With mariadb10 always ask for annotate rows events */
 | |
|     if (router->mariadb10_compat)
 | |
|     {
 | |
|         /* set flag for annotate rows event request */
 | |
|         encode_value(&data[9], BLR_REQUEST_ANNOTATE_ROWS_EVENT, 16);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         encode_value(&data[9], 0, 16);      // No flag set
 | |
|     }
 | |
| 
 | |
|     encode_value(&data[11],
 | |
|                  router->serverid,
 | |
|                  32);                       // Server-id of MaxScale
 | |
|     memcpy((char*)&data[15],
 | |
|            router->binlog_name,
 | |
|            binlog_file_len);                // binlog filename
 | |
| 
 | |
|     // This is hack to get the result set processing in order for binlogrouter
 | |
|     MySQLProtocol* proto = (MySQLProtocol*)router->master->protocol;
 | |
|     proto->current_command = MXS_COM_BINLOG_DUMP;
 | |
| 
 | |
|     return buf;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|  * Encode a value into a number of bits in a MySQL packet
 | |
|  *
 | |
|  * @param   data    Point to location in target packet
 | |
|  * @param   value   The value to pack
 | |
|  * @param   len Number of bits to encode value into
 | |
|  */
 | |
| void encode_value(unsigned char* data, unsigned int value, int len)
 | |
| {
 | |
|     while (len > 0)
 | |
|     {
 | |
|         *data++ = value & 0xff;
 | |
|         value >>= 8;
 | |
|         len -= 8;
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Check that the stored event checksum matches the calculated checksum
 | |
|  */
 | |
| static bool verify_checksum(ROUTER_INSTANCE* router, size_t len, uint8_t* ptr)
 | |
| {
 | |
|     bool rval = true;
 | |
|     uint32_t offset = MYSQL_HEADER_LEN + 1;
 | |
|     uint32_t size = len - (offset + MYSQL_CHECKSUM_LEN);
 | |
| 
 | |
|     uint32_t checksum = crc32(0L, ptr + offset, size);
 | |
|     uint32_t pktsum = EXTRACT32(ptr + offset + size);
 | |
| 
 | |
|     if (pktsum != checksum)
 | |
|     {
 | |
|         rval = false;
 | |
|         MXS_ERROR("%s: Checksum error in event from master, "
 | |
|                   "binlog %s @ %lu. Closing master connection.",
 | |
|                   router->service->name,
 | |
|                   router->binlog_name,
 | |
|                   router->current_pos);
 | |
|         router->stats.n_badcrc++;
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Reset router errors
 | |
|  *
 | |
|  * @param router Router instance
 | |
|  * @param hdr Replication header
 | |
|  */
 | |
| static void reset_errors(ROUTER_INSTANCE* router, REP_HEADER* hdr)
 | |
| {
 | |
|     pthread_mutex_lock(&router->lock);
 | |
| 
 | |
|     /* set mysql errno to 0 */
 | |
|     router->m_errno = 0;
 | |
| 
 | |
|     /* Remove error message */
 | |
|     if (router->m_errmsg)
 | |
|     {
 | |
|         MXS_FREE(router->m_errmsg);
 | |
|     }
 | |
|     router->m_errmsg = NULL;
 | |
| 
 | |
|     pthread_mutex_unlock(&router->lock);
 | |
| #ifdef SHOW_EVENTS
 | |
|     printf("blr: event type 0x%02x, flags 0x%04x, "
 | |
|            "event size %d, event timestamp %" PRIu32 "\n",
 | |
|            hdr->event_type,
 | |
|            hdr->flags,
 | |
|            hdr->event_size,
 | |
|            hdr->timestamp);
 | |
| #endif
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * blr_handle_binlog_record - we have received binlog records from
 | |
|  * the master and we must now work out what to do with them.
 | |
|  *
 | |
|  * @param router    The router instance
 | |
|  * @param pkt       The binlog records
 | |
|  */
 | |
| void blr_handle_binlog_record(ROUTER_INSTANCE* router, GWBUF* pkt)
 | |
| {
 | |
|     uint8_t* msg = NULL;
 | |
|     REP_HEADER hdr;
 | |
|     uint32_t len = 0;
 | |
|     int check_packet_len;
 | |
|     int semisync_bytes;
 | |
|     int semi_sync_send_ack = 0;
 | |
| 
 | |
|     /*
 | |
|      * Loop over all the packets while we still have some data
 | |
|      * and the packet length is enough to hold a replication event
 | |
|      * header.
 | |
|      */
 | |
|     while (pkt)
 | |
|     {
 | |
|         uint8_t* ptr = GWBUF_DATA(pkt);
 | |
|         len = gw_mysql_get_byte3(ptr);
 | |
|         semisync_bytes = 0;
 | |
| 
 | |
|         /*
 | |
|          * ptr now points at the current message in a contiguous buffer,
 | |
|          * this buffer is either within the GWBUF or in a malloc'd
 | |
|          * copy if the message straddles GWBUF's.
 | |
|          */
 | |
| 
 | |
|         if (len < BINLOG_EVENT_HDR_LEN && router->master_event_state != BLR_EVENT_ONGOING)
 | |
|         {
 | |
|             const char* event_msg = "unknown";
 | |
| 
 | |
|             /* Packet is too small to be a binlog event */
 | |
|             if (ptr[4] == 0xfe)     /* EOF Packet */
 | |
|             {
 | |
|                 event_msg = "end of file";
 | |
|             }
 | |
|             else if (ptr[4] == 0xff)    /* EOF Packet */
 | |
|             {
 | |
|                 event_msg = "error";
 | |
|             }
 | |
|             MXS_NOTICE("Non-event message (%s) from master.", event_msg);
 | |
|             gwbuf_free(pkt);
 | |
|             break;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             if (router->master_event_state == BLR_EVENT_DONE)
 | |
|             {
 | |
|                 /** This is the start of a new event */
 | |
|                 pthread_mutex_lock(&router->lock);
 | |
|                 router->stats.n_binlogs++;
 | |
|                 router->stats.n_binlogs_ses++;
 | |
|                 pthread_mutex_unlock(&router->lock);
 | |
| 
 | |
|                 /* Check for semi-sync in event with OK byte[4]:
 | |
|                  * move pointer 2 bytes ahead and set check_packet_len accordingly
 | |
|                  */
 | |
|                 if (ptr[4] == 0 && router->master_semi_sync != MASTER_SEMISYNC_NOT_AVAILABLE
 | |
|                     && ptr[5] == BLR_MASTER_SEMI_SYNC_INDICATOR)
 | |
|                 {
 | |
| 
 | |
|                     check_packet_len = MASTER_BYTES_BEFORE_EVENT_SEMI_SYNC;
 | |
|                     semi_sync_send_ack = ptr[6];
 | |
| 
 | |
|                     /* Extract header from the semi-sync event */
 | |
|                     blr_extract_header_semisync(ptr, &hdr);
 | |
| 
 | |
|                     /** Remove the semi-sync bytes */
 | |
|                     memmove(ptr, ptr + 2, 5);
 | |
|                     ptr += 2;
 | |
|                     semisync_bytes = 2;
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     semi_sync_send_ack = 0;
 | |
|                     check_packet_len = MASTER_BYTES_BEFORE_EVENT;
 | |
| 
 | |
|                     /* Extract header from thr event */
 | |
|                     blr_extract_header(ptr, &hdr);
 | |
|                 }
 | |
| 
 | |
|                 /* Sanity check */
 | |
|                 if (hdr.ok == 0)
 | |
|                 {
 | |
|                     if (hdr.event_size != len - (check_packet_len - MYSQL_HEADER_LEN)
 | |
|                         && (hdr.event_size + (check_packet_len - MYSQL_HEADER_LEN)) < MYSQL_PACKET_LENGTH_MAX)
 | |
|                     {
 | |
|                         MXS_ERROR("Packet length is %d, but event size is %d, "
 | |
|                                   "binlog file %s position %lu.",
 | |
|                                   len,
 | |
|                                   hdr.event_size,
 | |
|                                   router->binlog_name,
 | |
|                                   router->current_pos);
 | |
| 
 | |
|                         blr_log_packet(LOG_ERR, "Packet:", ptr, len);
 | |
| 
 | |
|                         if (msg)
 | |
|                         {
 | |
|                             MXS_FREE(msg);
 | |
|                             /* msg = NULL; Not needed unless msg will be referred to again */
 | |
|                         }
 | |
| 
 | |
|                         break;
 | |
|                     }
 | |
| 
 | |
|                     /**
 | |
|                      * This is the first (and possibly last) packet of a replication
 | |
|                      * event. We store the header in case the event is large and
 | |
|                      * it is transmitted over multiple network packets.
 | |
|                      */
 | |
|                     router->master_event_state = BLR_EVENT_STARTED;
 | |
|                     memcpy(&router->stored_header, &hdr, sizeof(hdr));
 | |
|                     reset_errors(router, &hdr);
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     /* Terminate replication and exit from main loop */
 | |
|                     blr_terminate_master_replication(router, ptr, len);
 | |
| 
 | |
|                     gwbuf_free(pkt);
 | |
|                     pkt = NULL;
 | |
| 
 | |
|                     break;
 | |
|                 }
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 /** We're processing a multi-packet replication event */
 | |
|                 mxb_assert(router->master_event_state == BLR_EVENT_ONGOING);
 | |
|             }
 | |
| 
 | |
|             /** Gather the event into one big buffer */
 | |
|             GWBUF* part = gwbuf_split(&pkt, len + MYSQL_HEADER_LEN);
 | |
| 
 | |
|             if (semisync_bytes)
 | |
|             {
 | |
|                 /** Consume the two semi-sync bytes */
 | |
|                 part = gwbuf_consume(part, semisync_bytes);
 | |
|             }
 | |
| 
 | |
|             mxb_assert(router->master_event_state == BLR_EVENT_STARTED
 | |
|                        || router->master_event_state == BLR_EVENT_ONGOING);
 | |
| 
 | |
|             if (router->master_event_state == BLR_EVENT_ONGOING)
 | |
|             {
 | |
|                 /**
 | |
|                  * Consume the network header so that we can append the raw
 | |
|                  * event data to the original buffer. This allows both checksum
 | |
|                  * calculations and encryption to process it as a contiguous event
 | |
|                  */
 | |
|                 part = gwbuf_consume(part, MYSQL_HEADER_LEN);
 | |
|             }
 | |
| 
 | |
|             router->stored_event = gwbuf_append(router->stored_event, part);
 | |
| 
 | |
|             if (len < MYSQL_PACKET_LENGTH_MAX)
 | |
|             {
 | |
|                 /**
 | |
|                  * This is either the only packet for the event or the last
 | |
|                  * packet in a series for this event. The buffer now contains
 | |
|                  * the network header of the first packet (4 bytes) and one OK byte.
 | |
|                  * The semi-sync bytes are always consumed at an earlier stage.
 | |
|                  */
 | |
|                 mxb_assert(router->master_event_state != BLR_EVENT_DONE);
 | |
| 
 | |
|                 if (router->master_event_state != BLR_EVENT_STARTED)
 | |
|                 {
 | |
|                     /**
 | |
|                      * This is not the first packet for this event. We must use
 | |
|                      * the stored header.
 | |
|                      */
 | |
|                     memcpy(&hdr, &router->stored_header, sizeof(hdr));
 | |
|                 }
 | |
| 
 | |
|                 /** The event is now complete */
 | |
|                 router->master_event_state = BLR_EVENT_DONE;
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 /**
 | |
|                  * This packet is a part of a series of packets that contain an
 | |
|                  * event larger than MYSQL_PACKET_LENGTH_MAX bytes.
 | |
|                  *
 | |
|                  * For each partial event chunk, we remove the network header and
 | |
|                  * append it to router->stored_event. The first event is an
 | |
|                  * exception to this and it is appended as-is with the network
 | |
|                  * header and the extra OK byte.
 | |
|                  */
 | |
|                 mxb_assert(len == MYSQL_PACKET_LENGTH_MAX);
 | |
|                 router->master_event_state = BLR_EVENT_ONGOING;
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             /**
 | |
|              *  We now have the complete event in one contiguous buffer:
 | |
|              *  router->master_event_state is BLR_EVENT_DONE
 | |
|              */
 | |
|             router->stored_event = gwbuf_make_contiguous(router->stored_event);
 | |
|             MXS_ABORT_IF_NULL(router->stored_event);
 | |
| 
 | |
|             ptr = GWBUF_DATA(router->stored_event);
 | |
| 
 | |
|             /**
 | |
|              * len is now the length of the complete event plus 4 bytes of network
 | |
|              * header and one OK byte. Semi-sync bytes are never stored.
 | |
|              */
 | |
|             len = gwbuf_length(router->stored_event);
 | |
| 
 | |
|             /**
 | |
|              * If checksums are enabled, verify that the stored checksum
 | |
|              * matches the one we calculated
 | |
|              */
 | |
|             if (router->master_chksum && !verify_checksum(router, len, ptr))
 | |
|             {
 | |
|                 MXS_FREE(msg);
 | |
|                 blr_master_close(router);
 | |
|                 blr_start_master_in_main(router);
 | |
|                 return;
 | |
|             }
 | |
| 
 | |
|             if (hdr.ok == 0)
 | |
|             {
 | |
|                 if (!blr_handle_one_event(router, hdr, ptr, len, semi_sync_send_ack))
 | |
|                 {
 | |
|                     gwbuf_free(pkt);
 | |
|                     return;
 | |
|                 }
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 blr_terminate_master_replication(router, ptr, len);
 | |
|             }
 | |
| 
 | |
|             /** Finished processing the event */
 | |
|             gwbuf_free(router->stored_event);
 | |
|             router->stored_event = NULL;
 | |
|         }
 | |
| 
 | |
|         if (msg)
 | |
|         {
 | |
|             MXS_FREE(msg);
 | |
|             msg = NULL;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     blr_file_flush(router);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Populate a header structure for a replication message from a GWBUF structure.
 | |
|  *
 | |
|  * @param pkt   The incoming packet in a GWBUF chain
 | |
|  * @param hdr   The packet header to populate
 | |
|  */
 | |
| void blr_extract_header(register uint8_t* ptr, register REP_HEADER* hdr)
 | |
| {
 | |
| 
 | |
|     hdr->payload_len = EXTRACT24(ptr);
 | |
|     hdr->seqno = ptr[3];
 | |
|     hdr->ok = ptr[4];
 | |
|     hdr->timestamp = EXTRACT32(&ptr[5]);
 | |
|     hdr->event_type = ptr[9];
 | |
|     hdr->serverid = EXTRACT32(&ptr[10]);
 | |
|     hdr->event_size = EXTRACT32(&ptr[14]);
 | |
|     hdr->next_pos = EXTRACT32(&ptr[18]);
 | |
|     hdr->flags = EXTRACT16(&ptr[22]);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Process a binlog rotate event.
 | |
|  *
 | |
|  * @param router    The instance of the router
 | |
|  * @param ptr       The packet containing the rotate event
 | |
|  * @param hdr       The replication message header
 | |
|  * @return          1 if the file could be rotated, 0 otherwise.
 | |
|  */
 | |
| int blr_rotate_event(ROUTER_INSTANCE* router, uint8_t* ptr, REP_HEADER* hdr)
 | |
| {
 | |
|     int len, slen;
 | |
|     uint64_t pos;
 | |
|     char file[BINLOG_FNAMELEN + 1];
 | |
| 
 | |
|     ptr += BINLOG_EVENT_HDR_LEN;                    // Skip event header
 | |
|     len = hdr->event_size - BINLOG_EVENT_HDR_LEN;   // Event size minus header
 | |
|     pos = extract_field(ptr + 4, 32);
 | |
|     pos <<= 32;
 | |
|     pos |= extract_field(ptr, 32);
 | |
|     slen = len - (8 + BINLOG_EVENT_CRC_SIZE);   // Allow for position and CRC
 | |
|     if (!router->master_chksum)
 | |
|     {
 | |
|         slen += BINLOG_EVENT_CRC_SIZE;
 | |
|     }
 | |
|     if (slen > BINLOG_FNAMELEN)
 | |
|     {
 | |
|         slen = BINLOG_FNAMELEN;
 | |
|     }
 | |
|     memcpy(file, ptr + 8, slen);
 | |
|     file[slen] = 0;
 | |
| 
 | |
| #ifdef VERBOSE_ROTATE
 | |
|     printf("binlog rotate: ");
 | |
|     while (len--)
 | |
|     {
 | |
|         printf("0x%02x ", *ptr++);
 | |
|     }
 | |
|     printf("\n");
 | |
|     printf("New file: %s @ %ld\n", file, pos);
 | |
| #endif
 | |
| 
 | |
|     strcpy(router->prevbinlog, router->binlog_name);
 | |
| 
 | |
|     int rotated = 1;
 | |
|     int remove_encrytion_ctx = 0;
 | |
| 
 | |
|     /* Different file name in rotate event or missing current binlog file */
 | |
|     if ((strncmp(router->binlog_name, file, slen) != 0)
 | |
|         || !blr_binlog_file_exists(router, NULL))
 | |
|     {
 | |
|         remove_encrytion_ctx = 1;
 | |
|         router->stats.n_rotates++;
 | |
|         if (blr_file_rotate(router, file, pos) == 0)
 | |
|         {
 | |
|             rotated = 0;
 | |
|         }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         /**
 | |
|          * ROTATE_EVENT reports a binlog file which is the same
 | |
|          * as router->binlog_name.
 | |
|          *
 | |
|          * If mariadb10_gtid is On, let's Add/Update into GTID repo:
 | |
|          * this allows SHOW BINARY LOGS to list all files
 | |
|          * including the ones without GTID events.
 | |
|          */
 | |
| 
 | |
|         if (router->mariadb10_compat
 | |
|             && router->mariadb10_gtid)
 | |
|         {
 | |
|             blr_file_update_gtid(router);
 | |
|         }
 | |
|     }
 | |
|     pthread_mutex_lock(&router->binlog_lock);
 | |
|     router->rotating = 0;
 | |
| 
 | |
|     /* remove current binlog encryption context */
 | |
|     if (remove_encrytion_ctx == 1)
 | |
|     {
 | |
|         MXS_FREE(router->encryption_ctx);
 | |
|         router->encryption_ctx = NULL;
 | |
|     }
 | |
|     pthread_mutex_unlock(&router->binlog_lock);
 | |
|     return rotated;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Create the auth data needed to be able to call dcb_connect.
 | |
|  *
 | |
|  * This doesn't really belong here and should be moved at some stage.
 | |
|  */
 | |
| static void* CreateMySQLAuthData(const char* username, const char* password, const char* database)
 | |
| {
 | |
|     MYSQL_session* auth_info;
 | |
| 
 | |
|     if (username == NULL || password == NULL)
 | |
|     {
 | |
|         MXS_ERROR("You must specify both username and password for the binlog router.");
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     if (strlen(username) > MYSQL_USER_MAXLEN)
 | |
|     {
 | |
|         MXS_ERROR("Provided user name %s is longer than maximum length %d.",
 | |
|                   username,
 | |
|                   MYSQL_USER_MAXLEN);
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     if (strlen(database) > MYSQL_DATABASE_MAXLEN)
 | |
|     {
 | |
|         MXS_ERROR("Provided database %s is longer than maximum length %d.",
 | |
|                   database,
 | |
|                   MYSQL_DATABASE_MAXLEN);
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     if ((auth_info =
 | |
|              static_cast<MYSQL_session*>(MXS_CALLOC(1, sizeof(MYSQL_session)))) == NULL)
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     strcpy(auth_info->user, username);
 | |
|     strcpy(auth_info->db, database);
 | |
|     gw_sha1_str((const uint8_t*)password, strlen(password), auth_info->client_sha1);
 | |
| 
 | |
|     return auth_info;
 | |
| }
 | |
| 
 | |
| /** Actions that can be taken when an event is being distributed to the slaves*/
 | |
| typedef enum
 | |
| {
 | |
|     SLAVE_SEND_EVENT,       /*< Send the event to the slave */
 | |
|     SLAVE_FORCE_CATCHUP,    /*< Force the slave into catchup mode */
 | |
|     SLAVE_EVENT_ALREADY_SENT/*< The slave already has the event, don't send it */
 | |
| } slave_event_action_t;
 | |
| 
 | |
| /**
 | |
|  * Write a raw event (the first 40 bytes at most) to a log file
 | |
|  *
 | |
|  * @param priority The syslog priority of the message (LOG_ERR, LOG_WARNING, etc.)
 | |
|  * @param msg      A textual message to write before the packet
 | |
|  * @param ptr      Pointer to the message buffer
 | |
|  * @param len      Length of message packet
 | |
|  */
 | |
| static void blr_log_packet(int priority, const char* msg, uint8_t* ptr, int len)
 | |
| {
 | |
|     char buf[400] = "";
 | |
|     char* bufp;
 | |
|     int i;
 | |
| 
 | |
|     bufp = buf;
 | |
|     bufp += sprintf(bufp, "%s length = %d: ", msg, len);
 | |
|     for (i = 0; i < len && i < 40; i++)
 | |
|     {
 | |
|         bufp += sprintf(bufp, "0x%02x ", ptr[i]);
 | |
|     }
 | |
|     if (i < len)
 | |
|     {
 | |
|         MXS_LOG_MESSAGE(priority, "%s...", buf);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         MXS_LOG_MESSAGE(priority, "%s", buf);
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Check if the master connection is in place and we
 | |
|  * are downlaoding binlogs
 | |
|  *
 | |
|  * @param router    The router instance
 | |
|  * @return non-zero if we are recivign binlog records
 | |
|  */
 | |
| int blr_master_connected(ROUTER_INSTANCE* router)
 | |
| {
 | |
|     return router->master_state == BLRM_BINLOGDUMP;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Extract a result value from the set of messages that make up a
 | |
|  * MySQL response packet.
 | |
|  *
 | |
|  * @param buf   The GWBUF containing the response
 | |
|  * @param col   The column number to return
 | |
|  * @return  The result form the column or NULL. The caller must free the result
 | |
|  */
 | |
| char* blr_extract_column(GWBUF* buf, int col)
 | |
| {
 | |
|     uint8_t* ptr;
 | |
|     int len, ncol, collen;
 | |
|     char* rval;
 | |
| 
 | |
|     if (buf == NULL)
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     ptr = (uint8_t*)GWBUF_DATA(buf);
 | |
|     /* First packet should be the column count */
 | |
|     len = EXTRACT24(ptr);
 | |
|     ptr += 3;
 | |
|     if (*ptr != 1)      // Check sequence number is 1
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
|     ptr++;
 | |
|     ncol = *ptr++;
 | |
|     if (ncol < col)     // Not that many column in result
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
|     // Now ptr points at the column definition
 | |
|     while (ncol-- > 0)
 | |
|     {
 | |
|         len = EXTRACT24(ptr);
 | |
|         ptr += 4;   // Skip to payload
 | |
|         ptr += len; // Skip over payload
 | |
|     }
 | |
|     // Now we should have an EOF packet
 | |
|     len = EXTRACT24(ptr);
 | |
|     ptr += 4;       // Skip to payload
 | |
|     if (*ptr != 0xfe)
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
|     ptr += len;
 | |
| 
 | |
|     // Finally we have reached the row
 | |
|     len = EXTRACT24(ptr);
 | |
|     ptr += 4;
 | |
| 
 | |
|     /** The first EOF packet signals the start of the resultset rows and the second
 | |
|      *  EOF packet signals the end of the result set. If the resultset
 | |
|      *  contains a second EOF packet right after the first one, the result set is empty and
 | |
|      *  contains no rows. */
 | |
|     if (len == 5 && *ptr == 0xfe)
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     while (--col > 0)
 | |
|     {
 | |
|         collen = *ptr++;
 | |
|         ptr += collen;
 | |
|     }
 | |
|     collen = *ptr++;
 | |
|     if ((rval = static_cast<char*>(MXS_MALLOC(collen + 1))) == NULL)
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
|     memcpy(rval, ptr, collen);
 | |
|     rval[collen] = 0;       // NULL terminate
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Stop and start the master connection
 | |
|  *
 | |
|  * @param router    The router instance
 | |
|  */
 | |
| void blr_stop_start_master(ROUTER_INSTANCE* router)
 | |
| {
 | |
|     if (router->master)
 | |
|     {
 | |
|         if (router->master->fd != -1
 | |
|             && router->master->state == DCB_STATE_POLLING)
 | |
|         {
 | |
|             blr_close_master_in_main(router);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     pthread_mutex_lock(&router->lock);
 | |
| 
 | |
|     router->master_state = BLRM_SLAVE_STOPPED;
 | |
| 
 | |
|     /* set last_safe_pos */
 | |
|     router->last_safe_pos = router->binlog_position;
 | |
| 
 | |
|     /**
 | |
|      * Set router->prevbinlog to router->binlog_name
 | |
|      * The FDE event with current filename may arrive after STOP SLAVE is received
 | |
|      */
 | |
| 
 | |
|     if (strcmp(router->binlog_name, router->prevbinlog) != 0)
 | |
|     {
 | |
|         strcpy(router->prevbinlog, router->binlog_name);    // Same size
 | |
|     }
 | |
| 
 | |
|     if (router->client)
 | |
|     {
 | |
|         if (router->client->fd != -1
 | |
|             && router->client->state == DCB_STATE_POLLING)
 | |
|         {
 | |
|             // Is this dead code? dcb->fd for internal DCBs is always -1
 | |
|             dcb_close(router->client);
 | |
|             router->client = NULL;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     router->master_state = BLRM_UNCONNECTED;
 | |
|     pthread_mutex_unlock(&router->lock);
 | |
| 
 | |
|     blr_master_reconnect(router);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * The heartbeat check function called from the housekeeper.
 | |
|  * We can try a new master connection if current one is seen out of date
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  */
 | |
| 
 | |
| static bool blr_check_last_master_event(void* inst)
 | |
| {
 | |
|     bool rval = true;
 | |
|     ROUTER_INSTANCE* router = (ROUTER_INSTANCE*)inst;
 | |
|     int master_check = 1;
 | |
|     int master_state = BLRM_UNCONNECTED;
 | |
|     char task_name[BLRM_TASK_NAME_LEN + 1] = "";
 | |
| 
 | |
|     pthread_mutex_lock(&router->lock);
 | |
| 
 | |
|     master_check = blr_check_heartbeat(router);
 | |
| 
 | |
|     master_state = router->master_state;
 | |
| 
 | |
|     pthread_mutex_unlock(&router->lock);
 | |
| 
 | |
|     if (!master_check)
 | |
|     {
 | |
|         /*
 | |
|          * stop current master connection
 | |
|          * and try a new connection
 | |
|          */
 | |
|         blr_stop_start_master(router);
 | |
|     }
 | |
| 
 | |
|     if ((!master_check) || (master_state != BLRM_BINLOGDUMP))
 | |
|     {
 | |
|         /*
 | |
|          * Remove the task, it will be added again
 | |
|          * when master state is back to BLRM_BINLOGDUMP
 | |
|          * by blr_master_response()
 | |
|          */
 | |
|         snprintf(task_name,
 | |
|                  BLRM_TASK_NAME_LEN,
 | |
|                  "%s heartbeat",
 | |
|                  router->service->name);
 | |
| 
 | |
|         rval = false;
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Check last heartbeat or last received event against router->heartbeat time interval
 | |
|  *
 | |
|  * checked interval is againts (router->heartbeat + BLR_NET_LATENCY_WAIT_TIME)
 | |
|  * that is currently set to 1
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @return          0 if master connection must be closed and opened again, 1 otherwise
 | |
|  */
 | |
| 
 | |
| int blr_check_heartbeat(ROUTER_INSTANCE* router)
 | |
| {
 | |
|     time_t t_now = time(0);
 | |
|     const char* event_desc = NULL;
 | |
| 
 | |
|     if (router->master_state != BLRM_BINLOGDUMP)
 | |
|     {
 | |
|         return 1;
 | |
|     }
 | |
| 
 | |
|     event_desc = blr_last_event_description(router);
 | |
| 
 | |
|     if (router->master_state == BLRM_BINLOGDUMP
 | |
|         && router->lastEventReceived > 0)
 | |
|     {
 | |
|         if (static_cast<unsigned long>(t_now - router->stats.lastReply)
 | |
|             > (router->heartbeat + BLR_NET_LATENCY_WAIT_TIME))
 | |
|         {
 | |
|             MXS_ERROR("No event received from master [%s]:%d in heartbeat period (%lu seconds), "
 | |
|                       "last event (%s %d) received %lu seconds ago. Assuming connection is dead "
 | |
|                       "and reconnecting.",
 | |
|                       router->service->dbref->server->address,
 | |
|                       router->service->dbref->server->port,
 | |
|                       router->heartbeat,
 | |
|                       event_desc != NULL ? event_desc : "unknown",
 | |
|                       router->lastEventReceived,
 | |
|                       t_now - router->stats.lastReply);
 | |
| 
 | |
|             return 0;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     return 1;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Log binlog router identy after master registration, state is BLRM_BINLOG_DUMP
 | |
|  *
 | |
|  * @param   router  The router instance
 | |
|  */
 | |
| 
 | |
| static void blr_log_identity(ROUTER_INSTANCE* router)
 | |
| {
 | |
|     char* master_uuid = NULL;
 | |
|     char* master_hostname = NULL;
 | |
|     char* master_version = NULL;
 | |
| 
 | |
|     if (router->set_master_version)
 | |
|     {
 | |
|         master_version = MXS_STRDUP(router->set_master_version);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         master_version = blr_extract_column(router->saved_master.selectver, 1);
 | |
|     }
 | |
| 
 | |
|     if (router->set_master_hostname)
 | |
|     {
 | |
|         master_hostname = MXS_STRDUP(router->set_master_hostname);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         master_hostname = blr_extract_column(router->saved_master.selecthostname, 1);
 | |
|     }
 | |
| 
 | |
|     if (router->set_master_uuid && router->master_uuid)
 | |
|     {
 | |
|         master_uuid = MXS_STRDUP(router->master_uuid);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         master_uuid = blr_extract_column(router->saved_master.uuid, 2);
 | |
|     }
 | |
| 
 | |
|     /* Seen by the master */
 | |
|     MXS_NOTICE("%s: identity seen by the master: "
 | |
|                "Server_id: %d, Slave_UUID: %s, Host: %s",
 | |
|                router->service->name,
 | |
|                router->serverid,
 | |
|                router->uuid == NULL ?
 | |
|                "not available" :
 | |
|                router->uuid,
 | |
|                (router->set_slave_hostname && router->set_slave_hostname[0]) ?
 | |
|                router->set_slave_hostname :
 | |
|                "not set");
 | |
| 
 | |
|     /* Seen by the slaves */
 | |
| 
 | |
|     /* MariaDB 5.5 and MariaDB don't have the MASTER_UUID var */
 | |
|     if (master_uuid == NULL)
 | |
|     {
 | |
|         MXS_NOTICE("%s: identity seen by the slaves: "
 | |
|                    "server_id: %d, hostname: %s, MySQL version: %s",
 | |
|                    router->service->name,
 | |
|                    router->masterid,
 | |
|                    (master_hostname == NULL ? "not available" : master_hostname),
 | |
|                    (master_version == NULL ? "not available" : master_version));
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         MXS_NOTICE("%s: identity seen by the slaves: "
 | |
|                    "server_id: %d, uuid: %s, hostname: %s, MySQL version: %s",
 | |
|                    router->service->name,
 | |
|                    router->masterid,
 | |
|                    master_uuid,
 | |
|                    (master_hostname == NULL ? "not available" : master_hostname),
 | |
|                    (master_version == NULL ? "not available" : master_version));
 | |
|     }
 | |
| 
 | |
|     MXS_FREE(master_version);
 | |
|     MXS_FREE(master_hostname);
 | |
|     MXS_FREE(master_uuid);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Write data into binlogs (incomplete event)
 | |
|  *
 | |
|  * Writes @c data_len bytes of data from @c buf into the current binlog being processed.
 | |
|  *
 | |
|  * @param router Router instance
 | |
|  * @param data_len Number of bytes to write
 | |
|  * @param buf Pointer where the data is read
 | |
|  * @return Number of bytes written or 0 on error
 | |
|  */
 | |
| int blr_write_data_into_binlog(ROUTER_INSTANCE* router, uint32_t data_len, uint8_t* buf)
 | |
| {
 | |
|     int n;
 | |
| 
 | |
|     if ((n = pwrite(router->binlog_fd,
 | |
|                     buf,
 | |
|                     data_len,
 | |
|                     router->last_written)) != static_cast<int64_t>(data_len))
 | |
|     {
 | |
|         MXS_ERROR("%s: Failed to write binlog record at %lu of %s, %s. "
 | |
|                   "Truncating to previous record.",
 | |
|                   router->service->name,
 | |
|                   router->binlog_position,
 | |
|                   router->binlog_name,
 | |
|                   mxs_strerror(errno));
 | |
| 
 | |
|         /* Remove any partial event that was written */
 | |
|         if (ftruncate(router->binlog_fd, router->binlog_position))
 | |
|         {
 | |
|             MXS_ERROR("%s: Failed to truncate binlog record at %lu of %s, %s. ",
 | |
|                       router->service->name,
 | |
|                       router->last_written,
 | |
|                       router->binlog_name,
 | |
|                       mxs_strerror(errno));
 | |
|         }
 | |
|         return 0;
 | |
|     }
 | |
|     router->last_written += data_len;
 | |
|     return n;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Send a replication event packet to a slave
 | |
|  *
 | |
|  * The first replication event packet contains one byte set to either
 | |
|  * 0x0, 0xfe or 0xff which signals what the state of the replication stream is.
 | |
|  * If the data pointed by @c buf is not the start of the replication header
 | |
|  * and part of the replication event is already sent, @c first must be set to
 | |
|  * false so that the first status byte is not sent again.
 | |
|  *
 | |
|  * @param slave Slave where the packet is sent to
 | |
|  * @param buf Buffer containing the data
 | |
|  * @param len Length of the data
 | |
|  * @param first If this is the first packet of a multi-packet event
 | |
|  * @return True on success, false when memory allocation fails
 | |
|  */
 | |
| bool blr_send_packet(ROUTER_SLAVE* slave, uint8_t* buf, uint32_t len, bool first)
 | |
| {
 | |
|     bool rval = true;
 | |
|     unsigned int datalen = len + (first ? 1 : 0);
 | |
|     GWBUF* buffer = gwbuf_alloc(datalen + MYSQL_HEADER_LEN);
 | |
|     if (buffer)
 | |
|     {
 | |
|         uint8_t* data = GWBUF_DATA(buffer);
 | |
|         encode_value(data, datalen, 24);
 | |
|         data += 3;
 | |
|         *data++ = slave->seqno++;
 | |
| 
 | |
|         if (first)
 | |
|         {
 | |
|             *data++ = 0;    // OK byte
 | |
|         }
 | |
| 
 | |
|         if (len > 0)
 | |
|         {
 | |
|             memcpy(data, buf, len);
 | |
|         }
 | |
| 
 | |
|         slave->stats.n_bytes += GWBUF_LENGTH(buffer);
 | |
|         MXS_SESSION_ROUTE_REPLY(slave->dcb->session, buffer);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         MXS_ERROR("failed to allocate %u bytes of memory when writing an "
 | |
|                   "event.",
 | |
|                   datalen + MYSQL_HEADER_LEN);
 | |
|         rval = false;
 | |
|     }
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Send a single replication event to a slave
 | |
|  *
 | |
|  * This sends the complete replication event to a slave. If the event size exceeds
 | |
|  * the maximum size of a MySQL packet, it will be sent in multiple packets.
 | |
|  *
 | |
|  * @param role  What is the role of the caller, slave or master.
 | |
|  * @param binlog_name The name of the binlogfile.
 | |
|  * @param binlog_pos The position in the binlogfile.
 | |
|  * @param slave Slave where the event is sent to
 | |
|  * @param hdr   Replication header
 | |
|  * @param buf   Pointer to the replication event as it was read from the disk
 | |
|  * @return True on success, false if memory allocation failed
 | |
|  */
 | |
| bool blr_send_event(blr_thread_role_t role,
 | |
|                     const char* binlog_name,
 | |
|                     uint32_t binlog_pos,
 | |
|                     ROUTER_SLAVE* slave,
 | |
|                     REP_HEADER*   hdr,
 | |
|                     uint8_t* buf)
 | |
| {
 | |
|     bool rval = true;
 | |
| 
 | |
|     if ((strcmp(slave->lsi_binlog_name, binlog_name) == 0)
 | |
|         && (slave->lsi_binlog_pos == binlog_pos))
 | |
|     {
 | |
|         std::stringstream t1;
 | |
|         std::stringstream t2;
 | |
|         t1 << std::this_thread::get_id();
 | |
|         t2 << slave->lsi_sender_tid;
 | |
| 
 | |
|         MXS_ERROR("Slave %s:%i, server-id %d, binlog '%s', position %u: "
 | |
|                   "thread %s in the role of %s could not send the event, "
 | |
|                   "the event has already been sent by thread %s in the role of %s. "
 | |
|                   "%u bytes buffered for writing in DCB %p. %lu events received from master.",
 | |
|                   slave->dcb->remote,
 | |
|                   dcb_get_port(slave->dcb),
 | |
|                   slave->serverid,
 | |
|                   binlog_name,
 | |
|                   binlog_pos,
 | |
|                   t1.str().c_str(),
 | |
|                   ROLETOSTR(role),
 | |
|                   t2.str().c_str(),
 | |
|                   ROLETOSTR(slave->lsi_sender_role),
 | |
|                   gwbuf_length(slave->dcb->writeq),
 | |
|                   slave->dcb,
 | |
|                   slave->router->stats.n_binlogs);
 | |
|         return false;
 | |
|     }
 | |
| 
 | |
|     /** Check if the event and the OK byte fit into a single packet  */
 | |
|     if (hdr->event_size + 1 < MYSQL_PACKET_LENGTH_MAX)
 | |
|     {
 | |
|         rval = blr_send_packet(slave, buf, hdr->event_size, true);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         /** Total size of all the payloads in all the packets */
 | |
|         int64_t len = hdr->event_size + 1;
 | |
|         bool first = true;
 | |
| 
 | |
|         while (rval && len > 0)
 | |
|         {
 | |
|             uint64_t payload_len = first ? MYSQL_PACKET_LENGTH_MAX - 1 :
 | |
|                 MXS_MIN(MYSQL_PACKET_LENGTH_MAX, len);
 | |
| 
 | |
|             if (blr_send_packet(slave, buf, payload_len, first))
 | |
|             {
 | |
|                 /** The check for exactly 0x00ffffff bytes needs to be done
 | |
|                  * here as well */
 | |
|                 if (len == MYSQL_PACKET_LENGTH_MAX)
 | |
|                 {
 | |
|                     blr_send_packet(slave, buf, 0, false);
 | |
|                 }
 | |
| 
 | |
|                 /** Add the extra byte written by blr_send_packet */
 | |
|                 len -= first ? payload_len + 1 : payload_len;
 | |
|                 buf += payload_len;
 | |
|                 first = false;
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 rval = false;
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     slave->stats.n_events++;
 | |
| 
 | |
|     if (rval)
 | |
|     {
 | |
|         strcpy(slave->lsi_binlog_name, binlog_name);
 | |
|         slave->lsi_binlog_pos = binlog_pos;
 | |
|         slave->lsi_sender_role = role;
 | |
|         slave->lsi_sender_tid = std::this_thread::get_id();
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         MXS_ERROR("Failed to send an event of %u bytes to slave at [%s]:%d.",
 | |
|                   hdr->event_size,
 | |
|                   slave->dcb->remote,
 | |
|                   dcb_get_port(slave->dcb));
 | |
|     }
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Stop the slave connection and log errors
 | |
|  *
 | |
|  * @param router Router instance
 | |
|  * @param ptr Pointer to the start of the packet
 | |
|  * @param len Length of the packet payload
 | |
|  */
 | |
| static void blr_terminate_master_replication(ROUTER_INSTANCE* router,
 | |
|                                              uint8_t* ptr,
 | |
|                                              int len)
 | |
| {
 | |
|     // Point to errno: begin + 4 bytes header + 1 byte flag
 | |
|     unsigned long mysql_errno = extract_field(ptr + 5, 16);
 | |
|     // Error message starts at begin + 4 header + 1 flag + 2 bytes errno + 6 bytes status msg
 | |
|     int err_msg_offset = 4 + 1 + 2 + 6;
 | |
|     // Error message size is: len - (err_msg_offset - 4 bytes header)
 | |
|     int msg_len = len - (err_msg_offset - 4);
 | |
|     char* msg_err = (char*)MXS_MALLOC(msg_len + 1);
 | |
|     MXS_ABORT_IF_NULL(msg_err);
 | |
| 
 | |
|     memcpy(msg_err, (char*)ptr + err_msg_offset, msg_len);
 | |
|     *(msg_err + msg_len) = '\0';
 | |
| 
 | |
|     std::string s(msg_err);
 | |
|     pthread_mutex_lock(&router->lock);
 | |
| 
 | |
|     char* old_errmsg = router->m_errmsg;
 | |
|     router->m_errmsg = msg_err;
 | |
|     router->m_errno = mysql_errno;
 | |
|     router->master_state = BLRM_SLAVE_STOPPED;
 | |
|     router->stats.n_binlog_errors++;
 | |
| 
 | |
|     pthread_mutex_unlock(&router->lock);
 | |
| 
 | |
|     MXS_FREE(old_errmsg);
 | |
|     // TODO: Would it or would it not be safe to access router->m_errmsg here?
 | |
|     // TODO: Is the locking above really needed?
 | |
|     MXS_ERROR("Error packet in binlog stream (%s@%lu): %s",
 | |
|               router->binlog_name,
 | |
|               router->current_pos,
 | |
|               s.c_str());
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Populate a header structure for a replication messager
 | |
|  * from a GWBUF structure with semi-sync enabled.
 | |
|  *
 | |
|  * @param pkt   The incoming packet in a GWBUF chain
 | |
|  * @param hdr   The packet header to populate
 | |
|  */
 | |
| static void blr_extract_header_semisync(register uint8_t* ptr, register REP_HEADER* hdr)
 | |
| {
 | |
| 
 | |
|     hdr->payload_len = EXTRACT24(ptr);
 | |
|     hdr->seqno = ptr[3];
 | |
|     hdr->ok = ptr[4];
 | |
|     /* Data available after 2 bytes (the 2 semisync bytes) */
 | |
|     hdr->timestamp = EXTRACT32(&ptr[5 + 2]);
 | |
|     hdr->event_type = ptr[9 + 2];
 | |
|     hdr->serverid = EXTRACT32(&ptr[10 + 2]);
 | |
|     hdr->event_size = EXTRACT32(&ptr[14 + 2]);
 | |
|     hdr->next_pos = EXTRACT32(&ptr[18 + 2]);
 | |
|     hdr->flags = EXTRACT16(&ptr[22 + 2]);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Send a MySQL Replication Semi-Sync ACK to the master server.
 | |
|  *
 | |
|  * @param router The router instance.
 | |
|  * @param pos The binlog position for the ACK reply.
 | |
|  * @return 1 if the packect is sent, 0 on errors
 | |
|  */
 | |
| 
 | |
| int blr_send_semisync_ack(ROUTER_INSTANCE* router, uint64_t pos)
 | |
| {
 | |
|     int seqno = 0;
 | |
|     int semi_sync_flag = BLR_MASTER_SEMI_SYNC_INDICATOR;
 | |
|     GWBUF* buf;
 | |
|     int len;
 | |
|     uint8_t* data;
 | |
|     int binlog_file_len = strlen(router->binlog_name);
 | |
| 
 | |
|     /* payload is: 1 byte semi-sync indicator + 8 bytes position + binlog name len */
 | |
|     len = 1 + 8 + binlog_file_len;
 | |
| 
 | |
|     /* add network header to size */
 | |
|     if ((buf = gwbuf_alloc(len + MYSQL_HEADER_LEN)) == NULL)
 | |
|     {
 | |
|         return 0;
 | |
|     }
 | |
| 
 | |
|     data = GWBUF_DATA(buf);
 | |
| 
 | |
|     encode_value(&data[0], len, 24);    // Payload length
 | |
|     data[3] = 0;                        // Sequence ID
 | |
|     data[4] = semi_sync_flag;           // Semi-sync indicator
 | |
| 
 | |
|     /**
 | |
|      * Next Bytes are: 8 bytes log position + len bin_log filename
 | |
|      */
 | |
| 
 | |
|     /* Position */
 | |
|     encode_value(&data[5], pos, 64);
 | |
| 
 | |
|     /* Binlog filename */
 | |
|     memcpy((char*)&data[13], router->binlog_name, binlog_file_len);
 | |
| 
 | |
|     router->master->func.write(router->master, buf);
 | |
| 
 | |
|     return 1;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Check the master semisync capability.
 | |
|  *
 | |
|  * @param buf The GWBUF data with master reply.
 | |
|  * @return Semisync value: non available, enabled, disabled
 | |
|  */
 | |
| static int blr_get_master_semisync(GWBUF* buf)
 | |
| {
 | |
|     char* key;
 | |
|     char* val = NULL;
 | |
|     int master_semisync = MASTER_SEMISYNC_NOT_AVAILABLE;
 | |
| 
 | |
|     key = blr_extract_column(buf, 1);
 | |
| 
 | |
|     if (key && strlen(key))
 | |
|     {
 | |
|         val = blr_extract_column(buf, 2);
 | |
|     }
 | |
|     MXS_FREE(key);
 | |
| 
 | |
|     if (val)
 | |
|     {
 | |
|         if (strncasecmp(val, "ON", 4) == 0)
 | |
|         {
 | |
|             master_semisync = MASTER_SEMISYNC_ENABLED;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             master_semisync = MASTER_SEMISYNC_DISABLED;
 | |
|         }
 | |
|     }
 | |
|     MXS_FREE(val);
 | |
| 
 | |
|     return master_semisync;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Notify all the registered slaves to read from binlog file
 | |
|  * the new events just received
 | |
|  *
 | |
|  * @param   router      The router instance
 | |
|  */
 | |
| void blr_notify_all_slaves(ROUTER_INSTANCE* router)
 | |
| {
 | |
|     ROUTER_SLAVE* slave;
 | |
|     int notified = 0;
 | |
| 
 | |
|     pthread_mutex_lock(&router->lock);
 | |
|     slave = router->slaves;
 | |
|     while (slave)
 | |
|     {
 | |
|         /* Notify a slave that has CS_WAIT_DATA bit set */
 | |
|         if (slave->state == BLRS_DUMPING
 | |
|             && blr_notify_waiting_slave(slave))
 | |
|         {
 | |
|             notified++;
 | |
|         }
 | |
| 
 | |
|         slave = slave->next;
 | |
|     }
 | |
|     pthread_mutex_unlock(&router->lock);
 | |
| 
 | |
|     if (notified > 0)
 | |
|     {
 | |
|         MXS_DEBUG("Notified %d slaves about new data.", notified);
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master:
 | |
|  *
 | |
|  * Handles previous reply from Master and sends
 | |
|  * SET @master_heartbeat_period to master
 | |
|  *
 | |
|  * The statement is sent only if router->heartbeat > 0
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @param buf       GWBUF with server reply to previous
 | |
|  *                  registration command
 | |
|  * @return          True if hearbeat request has been sent
 | |
|  *                  false otherwise (router->heartbeat = 0)
 | |
|  */
 | |
| static bool blr_register_heartbeat(ROUTER_INSTANCE* router, GWBUF* buf)
 | |
| {
 | |
|     char query[BLRM_SET_HEARTBEAT_QUERY_LEN];
 | |
|     char* val = blr_extract_column(buf, 2);
 | |
| 
 | |
|     // Response from master should be stored
 | |
|     blr_register_cache_response(router,
 | |
|                                 &router->saved_master.server_id,
 | |
|                                 "serverid",
 | |
|                                 buf);
 | |
| 
 | |
|     /**
 | |
|      * Keep the original master server id
 | |
|      * for any further reference.
 | |
|      */
 | |
|     router->orig_masterid = atoi(val);
 | |
| 
 | |
|     /**
 | |
|      * Set router->masterid from master server-id
 | |
|      * if it's not set by the config option
 | |
|      */
 | |
|     if (router->masterid == 0)
 | |
|     {
 | |
|         router->masterid = atoi(val);
 | |
|     }
 | |
|     MXS_FREE(val);
 | |
| 
 | |
|     /* Send Heartbeat request ony if router->heartbeat is set */
 | |
|     if (router->heartbeat > 0)
 | |
|     {
 | |
|         // Prepare new registration message
 | |
|         sprintf(query,
 | |
|                 "SET @master_heartbeat_period = %lu000000000",
 | |
|                 router->heartbeat);
 | |
|         blr_register_send_command(router, query, BLRM_HBPERIOD);
 | |
|     }
 | |
| 
 | |
|     return router->heartbeat != 0;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master:
 | |
|  *
 | |
|  * Handles previous reply from Master and sends
 | |
|  * SET @master_binlog_checksum to master
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @param buf       GWBUF with server reply to previous
 | |
|  *                  registration command
 | |
|  */
 | |
| static void blr_register_setchecksum(ROUTER_INSTANCE* router, GWBUF* buf)
 | |
| {
 | |
|     /**
 | |
|      * Response from master (set heartbeat reply) should be stored
 | |
|      * only if router->heartbeat is set
 | |
|      */
 | |
|     if (router->heartbeat > 0)
 | |
|     {
 | |
|         blr_register_cache_response(router,
 | |
|                                     &router->saved_master.heartbeat,
 | |
|                                     "heartbeat",
 | |
|                                     buf);
 | |
|     }
 | |
| 
 | |
|     // New registration message
 | |
|     blr_register_send_command(router,
 | |
|                               "SET @master_binlog_checksum ="
 | |
|                               " @@global.binlog_checksum",
 | |
|                               BLRM_CHKSUM1);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master:
 | |
|  *
 | |
|  * Handles previous reply from Master and sends
 | |
|  * SELECT @master_binlog_checksum to master
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @param buf       GWBUF with server reply to previous
 | |
|  *                  registration command
 | |
|  */
 | |
| static void blr_register_getchecksum(ROUTER_INSTANCE* router, GWBUF* buf)
 | |
| {
 | |
|     // Response from master should be stored
 | |
|     blr_register_cache_response(router,
 | |
|                                 &router->saved_master.chksum1,
 | |
|                                 "chksum1",
 | |
|                                 buf);
 | |
|     // New registration message
 | |
|     blr_register_send_command(router,
 | |
|                               "SELECT @master_binlog_checksum",
 | |
|                               BLRM_CHKSUM2);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master:
 | |
|  *
 | |
|  * Handles the reply from Master which
 | |
|  * contains the Binlog checksum algorithm in use:
 | |
|  * NONE or CRC32.
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @param buf       GWBUF with server reply to previous
 | |
|  *                  registration command
 | |
|  */
 | |
| static void blr_register_handle_checksum(ROUTER_INSTANCE* router, GWBUF* buf)
 | |
| {
 | |
|     // Set checksum from master reply
 | |
|     blr_set_checksum(router, buf);
 | |
| 
 | |
|     // Response from master should be stored
 | |
|     blr_register_cache_response(router,
 | |
|                                 &router->saved_master.chksum2,
 | |
|                                 "chksum2",
 | |
|                                 buf);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master:
 | |
|  *
 | |
|  * Handles the reply from Master and
 | |
|  * sends SHOW VARIABLES LIKE 'SERVER_UUID' to MySQL 5.6/5.7 Master
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @param buf       GWBUF with server reply to previous
 | |
|  *                  registration command
 | |
|  */
 | |
| static void blr_register_serveruuid(ROUTER_INSTANCE* router, GWBUF* buf)
 | |
| {
 | |
|     // Response from master should be stored
 | |
|     blr_register_cache_response(router,
 | |
|                                 &router->saved_master.gtid_mode,
 | |
|                                 "gtidmode",
 | |
|                                 buf);
 | |
|     // New registration message
 | |
|     blr_register_send_command(router,
 | |
|                               "SHOW VARIABLES LIKE 'SERVER_UUID'",
 | |
|                               BLRM_MUUID);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master:
 | |
|  *
 | |
|  * Handles the SERVER_UUID reply from MySQL 5.6/5.7 Master
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @param buf       GWBUF with server reply to previous
 | |
|  *                  registration command
 | |
|  */
 | |
| static void blr_register_slaveuuid(ROUTER_INSTANCE* router, GWBUF* buf)
 | |
| {
 | |
|     char* key;
 | |
|     char* val = NULL;
 | |
|     char query[BLRM_MASTER_REGITRATION_QUERY_LEN + 1];
 | |
| 
 | |
|     key = blr_extract_column(buf, 1);
 | |
|     if (key && strlen(key))
 | |
|     {
 | |
|         val = blr_extract_column(buf, 2);
 | |
|     }
 | |
|     if (key)
 | |
|     {
 | |
|         MXS_FREE(key);
 | |
|     }
 | |
| 
 | |
|     /* set the master_uuid from master if not set by the option */
 | |
|     if (!router->set_master_uuid)
 | |
|     {
 | |
|         MXS_FREE(router->master_uuid);
 | |
|         router->master_uuid = val;
 | |
|     }
 | |
| 
 | |
|     // Response from master should be stored
 | |
|     blr_register_cache_response(router,
 | |
|                                 &router->saved_master.uuid,
 | |
|                                 "uuid",
 | |
|                                 buf);
 | |
|     // New registration message
 | |
|     sprintf(query, "SET @slave_uuid='%s'", router->uuid);
 | |
|     blr_register_send_command(router, query, BLRM_SUUID);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master:
 | |
|  *
 | |
|  * Sends SET NAMES utf8 to Master
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @param buf       GWBUF with server reply to previous
 | |
|  *                  registration command
 | |
|  */
 | |
| static void blr_register_utf8(ROUTER_INSTANCE* router, GWBUF* buf)
 | |
| {
 | |
|     // Response from master should be stored
 | |
|     blr_register_cache_response(router,
 | |
|                                 &router->saved_master.setnames,
 | |
|                                 "setnames",
 | |
|                                 buf);
 | |
|     // New registration message
 | |
|     blr_register_send_command(router,
 | |
|                               "SET NAMES utf8",
 | |
|                               BLRM_UTF8);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master:
 | |
|  *
 | |
|  * Handles previous reply from Master and
 | |
|  * sends SELECT VERSION() to Master
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @param buf       GWBUF with server reply to previous
 | |
|  *                  registration command
 | |
|  */
 | |
| static void blr_register_selectversion(ROUTER_INSTANCE* router, GWBUF* buf)
 | |
| {
 | |
|     // Response from master should be stored
 | |
|     blr_register_cache_response(router,
 | |
|                                 &router->saved_master.select1,
 | |
|                                 "select1",
 | |
|                                 buf);
 | |
|     // New registration message
 | |
|     blr_register_send_command(router,
 | |
|                               "SELECT VERSION()",
 | |
|                               BLRM_SELECTVER);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master:
 | |
|  *
 | |
|  * Handles previous reply from Master and
 | |
|  * sends SELECT @@version_comment to Master
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @param buf       GWBUF with server reply to previous
 | |
|  *                  registration command
 | |
|  */
 | |
| static void blr_register_selectvercomment(ROUTER_INSTANCE* router, GWBUF* buf)
 | |
| {
 | |
|     // Response from master should be stored
 | |
|     blr_register_cache_response(router,
 | |
|                                 &router->saved_master.selectver,
 | |
|                                 "selectver",
 | |
|                                 buf);
 | |
|     // New registration message
 | |
|     blr_register_send_command(router,
 | |
|                               "SELECT @@version_comment limit 1",
 | |
|                               BLRM_SELECTVERCOM);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master:
 | |
|  *
 | |
|  * Handles previous reply from Master and
 | |
|  * sends SELECT @@version_comment to Master
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @param buf       GWBUF with server reply to previous
 | |
|  *                  registration command
 | |
|  */
 | |
| static void blr_register_selecthostname(ROUTER_INSTANCE* router, GWBUF* buf)
 | |
| {
 | |
|     // Response from master should be stored
 | |
|     blr_register_cache_response(router,
 | |
|                                 &router->saved_master.selectvercom,
 | |
|                                 "selectvercom",
 | |
|                                 buf);
 | |
|     // New registration message
 | |
|     blr_register_send_command(router,
 | |
|                               "SELECT @@hostname",
 | |
|                               BLRM_SELECTHOSTNAME);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master:
 | |
|  *
 | |
|  * Handles previous reply from Master and
 | |
|  * sends SELECT @@max_allowed_packet to Master
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @param buf       GWBUF with server reply to previous
 | |
|  *                  registration command
 | |
|  */
 | |
| static void blr_register_selectmap(ROUTER_INSTANCE* router, GWBUF* buf)
 | |
| {
 | |
|     // Response from master should be stored
 | |
|     blr_register_cache_response(router,
 | |
|                                 &router->saved_master.selecthostname,
 | |
|                                 "selecthostname",
 | |
|                                 buf);
 | |
|     // New registration message
 | |
|     blr_register_send_command(router,
 | |
|                               "SELECT @@max_allowed_packet",
 | |
|                               BLRM_MAP);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master (MaxWell compatibility):
 | |
|  *
 | |
|  * Handles previous reply from Master and
 | |
|  * sends SELECT IF(@@global.log_bin ...) to Master
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @param buf       GWBUF with server reply to previous
 | |
|  *                  registration command
 | |
|  */
 | |
| static void blr_register_mxw_binlogvars(ROUTER_INSTANCE* router, GWBUF* buf)
 | |
| {
 | |
|     // Response from master should be stored
 | |
|     blr_register_cache_response(router,
 | |
|                                 &router->saved_master.server_vars,
 | |
|                                 "server_vars",
 | |
|                                 buf);
 | |
|     // New registration message
 | |
|     blr_register_send_command(router,
 | |
|                               "SELECT IF(@@global.log_bin, 'ON', 'OFF'), "
 | |
|                               "@@global.binlog_format, @@global.binlog_row_image",
 | |
|                               BLRM_BINLOG_VARS);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master (MaxWell compatibility):
 | |
|  *
 | |
|  * Handles previous reply from Master and
 | |
|  * sends select @@lower_case_table_names to Master
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @param buf       GWBUF with server reply to previous
 | |
|  *                  registration command
 | |
|  */
 | |
| static void blr_register_mxw_tables(ROUTER_INSTANCE* router, GWBUF* buf)
 | |
| {
 | |
|     // Response from master should be stored
 | |
|     blr_register_cache_response(router,
 | |
|                                 &router->saved_master.binlog_vars,
 | |
|                                 "binlog_vars",
 | |
|                                 buf);
 | |
|     // New registration message
 | |
|     blr_register_send_command(router,
 | |
|                               "select @@lower_case_table_names",
 | |
|                               BLRM_LOWER_CASE_TABLES);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave protocol registration: check Master SEMI-SYNC replication
 | |
|  *
 | |
|  * Ask master server for Semi-Sync replication capability
 | |
|  *
 | |
|  * Note: Master server must have rpl_semi_sync_master plugin installed
 | |
|  * in order to start the Semi-Sync replication
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @param buf       The GWBUF to fill with new request
 | |
|  */
 | |
| static void blr_register_getsemisync(ROUTER_INSTANCE* router, GWBUF* buf)
 | |
| {
 | |
|     MXS_NOTICE("%s: checking Semi-Sync replication capability for master server [%s]:%d",
 | |
|                router->service->name,
 | |
|                router->service->dbref->server->address,
 | |
|                router->service->dbref->server->port);
 | |
| 
 | |
|     // New registration message
 | |
|     blr_register_send_command(router,
 | |
|                               "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'",
 | |
|                               BLRM_CHECK_SEMISYNC);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave protocol registration: handle SEMI-SYNC replication
 | |
|  *
 | |
|  * Get master semisync capability
 | |
|  * and if installed start the SEMI-SYNC replication
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @param buf       GWBUF with server reply to previous
 | |
|  *                  registration command
 | |
|  * @return          True is semi-sync can be started or false
 | |
|  */
 | |
| static bool blr_register_setsemisync(ROUTER_INSTANCE* router, GWBUF* buf)
 | |
| {
 | |
|     if (router->master_state == BLRM_CHECK_SEMISYNC)
 | |
|     {
 | |
|         /* Get master semi-sync installed, enabled, disabled */
 | |
|         router->master_semi_sync = blr_get_master_semisync(buf);
 | |
| 
 | |
|         /* Discard buffer */
 | |
|         gwbuf_free(buf);
 | |
| 
 | |
|         if (router->master_semi_sync == MASTER_SEMISYNC_NOT_AVAILABLE)
 | |
|         {
 | |
|             /* not installed */
 | |
|             MXS_NOTICE("%s: master server [%s]:%d doesn't have semi_sync capability",
 | |
|                        router->service->name,
 | |
|                        router->service->dbref->server->address,
 | |
|                        router->service->dbref->server->port);
 | |
| 
 | |
|             /* Continue without semisync */
 | |
|             router->master_state = BLRM_REQUEST_BINLOGDUMP;
 | |
| 
 | |
|             return false;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             if (router->master_semi_sync == MASTER_SEMISYNC_DISABLED)
 | |
|             {
 | |
|                 /* Installed but not enabled, right now */
 | |
|                 MXS_NOTICE("%s: master server [%s]:%d doesn't have semi_sync"
 | |
|                            " enabled right now, Request Semi-Sync Replication anyway",
 | |
|                            router->service->name,
 | |
|                            router->service->dbref->server->address,
 | |
|                            router->service->dbref->server->port);
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 /* Installed and enabled */
 | |
|                 MXS_NOTICE("%s: master server [%s]:%d has semi_sync enabled,"
 | |
|                            " Requesting Semi-Sync Replication",
 | |
|                            router->service->name,
 | |
|                            router->service->dbref->server->address,
 | |
|                            router->service->dbref->server->port);
 | |
|             }
 | |
| 
 | |
|             /* Request semisync */
 | |
|             blr_register_send_command(router,
 | |
|                                       "SET @rpl_semi_sync_slave = 1",
 | |
|                                       BLRM_REQUEST_SEMISYNC);
 | |
|             return true;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     return false;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master (MaxWell compatibility):
 | |
|  *
 | |
|  * Handles previous reply from Master and
 | |
|  * sets the state to BLRM_REGISTER_READY
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @param buf       GWBUF with server reply to previous
 | |
|  *                  registration command
 | |
|  */
 | |
| static void blr_register_mxw_handlelowercase(ROUTER_INSTANCE* router,
 | |
|                                              GWBUF* buf)
 | |
| {
 | |
|     // Response from master should be stored
 | |
|     blr_register_cache_response(router,
 | |
|                                 &router->saved_master.lower_case_tables,
 | |
|                                 "lower_case_tables",
 | |
|                                 buf);
 | |
|     // Set the new state
 | |
|     router->master_state = BLRM_REGISTER_READY;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master: generic send command
 | |
|  *
 | |
|  * Sends a SQL statement to Master server
 | |
|  *
 | |
|  * @param router     Current router instance
 | |
|  * @param command    The SQL command to send
 | |
|  * @param state      The next registration state value
 | |
|  */
 | |
| static void blr_register_send_command(ROUTER_INSTANCE* router,
 | |
|                                       const char* command,
 | |
|                                       unsigned int state)
 | |
| {
 | |
|     // Create MySQL protocol packet
 | |
|     GWBUF* buf = blr_make_query(router->master, (char*)command);
 | |
|     // Set the next registration phase state
 | |
|     router->master_state = state;
 | |
|     // Send the packet
 | |
|     router->master->func.write(router->master, buf);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master:
 | |
|  *
 | |
|  * Saves previous reply from Master
 | |
|  *
 | |
|  * @param router      Current router instance
 | |
|  * @param save_buf    The saved GWBUF to update
 | |
|  * @param save_tag    Tag name for disk writing
 | |
|  * @param in_buf      GWBUF with server reply to previous
 | |
|  *                    registration command
 | |
|  */
 | |
| static void blr_register_cache_response(ROUTER_INSTANCE* router,
 | |
|                                         GWBUF** save_buf,
 | |
|                                         const char* save_tag,
 | |
|                                         GWBUF* in_buf)
 | |
| {
 | |
|     if (*save_buf)
 | |
|     {
 | |
|         gwbuf_free(*save_buf);
 | |
|     }
 | |
|     // New value in memory
 | |
|     *save_buf = in_buf;
 | |
|     // New value saved to disk
 | |
|     blr_cache_response(router, (char*)save_tag, in_buf);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master:
 | |
|  *
 | |
|  * Handling the registration process:
 | |
|  *
 | |
|  * Note: some phases are specific to MySQL 5.6/5.7,
 | |
|  * others to MariaDB10.
 | |
|  *
 | |
|  * @param router      Current router instance
 | |
|  * @param in_buf      GWBUF with previous phase server response
 | |
|  */
 | |
| static void blr_start_master_registration(ROUTER_INSTANCE* router, GWBUF* buf)
 | |
| {
 | |
|     switch (router->master_state)
 | |
|     {
 | |
|     case BLRM_TIMESTAMP:
 | |
|         /**
 | |
|          * Previous state was BLRM_TIMESTAMP
 | |
|          * No need to save the server reply
 | |
|          */
 | |
|         gwbuf_free(buf);
 | |
|         blr_register_send_command(router,
 | |
|                                   "SHOW VARIABLES LIKE 'SERVER_ID'",
 | |
|                                   BLRM_SERVERID);
 | |
|         router->retry_count = 0;
 | |
|         break;
 | |
| 
 | |
|     case BLRM_SERVERID:
 | |
|         // If set heartbeat is not being sent, next state is BLRM_HBPERIOD
 | |
|         if (blr_register_heartbeat(router, buf))
 | |
|         {
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|     case BLRM_HBPERIOD:
 | |
|         blr_register_setchecksum(router, buf);
 | |
|         break;
 | |
| 
 | |
|     case BLRM_CHKSUM1:
 | |
|         blr_register_getchecksum(router, buf);
 | |
|         break;
 | |
| 
 | |
|     case BLRM_CHKSUM2:
 | |
|         // Set router->master_chksum based on server reply
 | |
|         blr_register_handle_checksum(router, buf);
 | |
|         // Next state is BLRM_MARIADB10 or BLRM_GTIDMODE
 | |
|         {
 | |
|             unsigned int state = router->mariadb10_compat ?
 | |
|                 BLRM_MARIADB10 :
 | |
|                 BLRM_GTIDMODE;
 | |
|             const char* command = router->mariadb10_compat ?
 | |
|                 "SET @mariadb_slave_capability=4" :
 | |
|                 "SELECT @@GLOBAL.GTID_MODE";
 | |
| 
 | |
|             blr_register_send_command(router, command, state);
 | |
|         }
 | |
|         break;
 | |
| 
 | |
|     case BLRM_MARIADB10:    // MariaDB10 Only
 | |
|         // Save server response
 | |
|         blr_register_cache_response(router,
 | |
|                                     &router->saved_master.mariadb10,
 | |
|                                     "mariadb10",
 | |
|                                     buf);
 | |
|         // Next state is BLRM_MARIADB10_GTID_DOMAIN or BLRM_LATIN1
 | |
|         {
 | |
|             /**
 | |
|              * Always request "gtid_domain_id" to Master server
 | |
|              * if MariaDB 10 Compatibilty is On
 | |
|              */
 | |
|             unsigned int state = router->mariadb10_compat ?
 | |
|                 BLRM_MARIADB10_GTID_DOMAIN :
 | |
|                 BLRM_LATIN1;
 | |
|             const char* command = router->mariadb10_compat ?
 | |
|                 "SELECT @@GLOBAL.gtid_domain_id" :
 | |
|                 "SET NAMES latin1";
 | |
|             blr_register_send_command(router, command, state);
 | |
|         }
 | |
|         break;
 | |
| 
 | |
|     case BLRM_MARIADB10_GTID_DOMAIN:    // MariaDB10 Only
 | |
|         {
 | |
|             // Extract GTID domain
 | |
|             char* val = blr_extract_column(buf, 1);
 | |
|             // Store the Master GTID domain
 | |
|             router->mariadb10_gtid_domain = atol(val);
 | |
|             MXS_FREE(val);
 | |
|             // Don't save the server response
 | |
|             gwbuf_free(buf);
 | |
|         }
 | |
| 
 | |
|         // Next state is BLRM_MARIADB10_REQUEST_GTID or BLRM_LATIN1
 | |
|         if (!router->mariadb10_master_gtid)
 | |
|         {
 | |
|             blr_register_send_command(router,
 | |
|                                       "SET NAMES latin1",
 | |
|                                       BLRM_LATIN1);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             blr_register_mariadb_gtid_request(router, buf);
 | |
|         }
 | |
|         break;
 | |
| 
 | |
|     case BLRM_MARIADB10_REQUEST_GTID:   // MariaDB10 Only
 | |
|         // Don't save GTID request
 | |
|         gwbuf_free(buf);
 | |
|         blr_register_send_command(router,
 | |
|                                   "SET @slave_gtid_strict_mode=1",
 | |
|                                   BLRM_MARIADB10_GTID_STRICT);
 | |
|         break;
 | |
| 
 | |
|     case BLRM_MARIADB10_GTID_STRICT:    // MariaDB10 Only
 | |
|         // Don't save GTID strict
 | |
|         gwbuf_free(buf);
 | |
|         blr_register_send_command(router,
 | |
|                                   "SET @slave_gtid_ignore_duplicates=1",
 | |
|                                   BLRM_MARIADB10_GTID_NO_DUP);
 | |
|         break;
 | |
| 
 | |
|     case BLRM_MARIADB10_GTID_NO_DUP:    // MariaDB10 Only
 | |
|         // Don't save GTID ignore
 | |
|         gwbuf_free(buf);
 | |
|         blr_register_send_command(router,
 | |
|                                   "SET NAMES latin1",
 | |
|                                   BLRM_LATIN1);
 | |
|         break;
 | |
| 
 | |
|     case BLRM_GTIDMODE:     // MySQL 5.6/5.7 only
 | |
|         blr_register_serveruuid(router, buf);
 | |
|         break;
 | |
| 
 | |
|     case BLRM_MUUID:    // MySQL 5.6/5.7 only
 | |
|         blr_register_slaveuuid(router, buf);
 | |
|         break;
 | |
| 
 | |
|     case BLRM_SUUID:    // MySQL 5.6/5.7 only
 | |
|         // Save server response
 | |
|         blr_register_cache_response(router,
 | |
|                                     &router->saved_master.setslaveuuid,
 | |
|                                     "ssuuid",
 | |
|                                     buf);
 | |
|         blr_register_send_command(router,
 | |
|                                   "SET NAMES latin1",
 | |
|                                   BLRM_LATIN1);
 | |
|         break;
 | |
| 
 | |
|     case BLRM_LATIN1:
 | |
|         blr_register_utf8(router, buf);
 | |
|         break;
 | |
| 
 | |
|     case BLRM_UTF8:
 | |
|         // Save server response
 | |
|         blr_register_cache_response(router,
 | |
|                                     &router->saved_master.utf8,
 | |
|                                     "utf8",
 | |
|                                     buf);
 | |
|         // Next state is MAXWELL BLRM_RESULTS_CHARSET or BLRM_SELECT1
 | |
|         {
 | |
|             unsigned int state = router->maxwell_compat ?
 | |
|                 BLRM_RESULTS_CHARSET :
 | |
|                 BLRM_SELECT1;
 | |
|             const char* command = router->maxwell_compat ?
 | |
|                 "SET character_set_results = NULL" :
 | |
|                 "SELECT 1";
 | |
| 
 | |
|             blr_register_send_command(router, command, state);
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|     case BLRM_RESULTS_CHARSET:  // MAXWELL only
 | |
|         gwbuf_free(buf);        // Discard server reply, don't save it
 | |
|         blr_register_send_command(router,
 | |
|                                   MYSQL_CONNECTOR_SQL_MODE_QUERY,
 | |
|                                   BLRM_SQL_MODE);
 | |
|         break;
 | |
| 
 | |
|     case BLRM_SQL_MODE:     // MAXWELL only
 | |
|         gwbuf_free(buf);    // Discard server reply, don't save it
 | |
|         blr_register_send_command(router,
 | |
|                                   "SELECT 1",
 | |
|                                   BLRM_SELECT1);
 | |
|         break;
 | |
| 
 | |
|     case BLRM_SELECT1:
 | |
|         blr_register_selectversion(router, buf);
 | |
|         break;
 | |
| 
 | |
|     case BLRM_SELECTVER:
 | |
|         blr_register_selectvercomment(router, buf);
 | |
|         break;
 | |
| 
 | |
|     case BLRM_SELECTVERCOM:
 | |
|         blr_register_selecthostname(router, buf);
 | |
|         break;
 | |
| 
 | |
|     case BLRM_SELECTHOSTNAME:
 | |
|         blr_register_selectmap(router, buf);
 | |
|         break;
 | |
| 
 | |
|     case BLRM_MAP:
 | |
|         // Save server response
 | |
|         blr_register_cache_response(router,
 | |
|                                     &router->saved_master.map,
 | |
|                                     "map",
 | |
|                                     buf);
 | |
|         if (router->maxwell_compat)
 | |
|         {
 | |
|             blr_register_send_command(router,
 | |
|                                       MYSQL_CONNECTOR_SERVER_VARS_QUERY,
 | |
|                                       BLRM_SERVER_VARS);
 | |
|             break;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             // Continue: ready for the registration, nothing to write/read
 | |
|             router->master_state = BLRM_REGISTER_READY;
 | |
|         }
 | |
| 
 | |
|     case BLRM_SERVER_VARS:      // MAXWELL only
 | |
|         /**
 | |
|          * This branch could be reached as fallthrough from BLRM_MAP
 | |
|          * with new state BLRM_REGISTER_READY
 | |
|          * Go ahead if maxwell_compat is not set
 | |
|          */
 | |
|         if (router->master_state == BLRM_SERVER_VARS && router->maxwell_compat)
 | |
|         {
 | |
|             blr_register_mxw_binlogvars(router, buf);
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|     case BLRM_BINLOG_VARS:      // MAXWELL only
 | |
|         /**
 | |
|          * This branch could be reached as fallthrough from BLRM_MAP
 | |
|          * with new state BLRM_REGISTER_READY.
 | |
|          * Go ahead if maxwell_compat is not set
 | |
|          */
 | |
|         if (router->master_state == BLRM_BINLOG_VARS && router->maxwell_compat)
 | |
|         {
 | |
|             blr_register_mxw_tables(router, buf);
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|     case BLRM_LOWER_CASE_TABLES:    // MAXWELL only
 | |
|         /**
 | |
|          * This branch could be reached as fallthrough from BLRM_MAP
 | |
|          * with new state BLRM_REGISTER_READY.
 | |
|          * Go ahead if maxwell_compat is not set
 | |
|          */
 | |
|         if (router->master_state == BLRM_LOWER_CASE_TABLES
 | |
|             && router->maxwell_compat)
 | |
|         {
 | |
|             blr_register_mxw_handlelowercase(router, buf);
 | |
|             // Continue: ready for the registration, nothing to write/read
 | |
|         }
 | |
| 
 | |
|     case BLRM_REGISTER_READY:
 | |
|         // Prepare Slave registration request: COM_REGISTER_SLAVE
 | |
|         buf = blr_make_registration(router);
 | |
|         // Set new state
 | |
|         router->master_state = BLRM_REGISTER;
 | |
|         // Send the packet
 | |
|         router->master->func.write(router->master, buf);
 | |
|         break;
 | |
| 
 | |
|     case BLRM_REGISTER:
 | |
|         /* discard master reply to COM_REGISTER_SLAVE */
 | |
|         gwbuf_free(buf);
 | |
| 
 | |
|         /* if semisync option is set, check for master semi-sync availability */
 | |
|         if (router->request_semi_sync)
 | |
|         {
 | |
|             blr_register_getsemisync(router, buf);
 | |
|             break;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             /* Continue */
 | |
|             router->master_state = BLRM_REQUEST_BINLOGDUMP;
 | |
|         }
 | |
| 
 | |
|     case BLRM_CHECK_SEMISYNC:
 | |
|         /**
 | |
|          * This branch could be reached as fallthrough from BLRM_REGISTER
 | |
|          * if request_semi_sync option is false
 | |
|          */
 | |
|         if (router->master_state == BLRM_CHECK_SEMISYNC)
 | |
|         {
 | |
|             if (blr_register_setsemisync(router, buf))
 | |
|             {
 | |
|                 break;
 | |
|             }
 | |
|         }
 | |
| 
 | |
|     case BLRM_REQUEST_SEMISYNC:
 | |
|         /**
 | |
|          * This branch could be reached as fallthrough from BLRM_REGISTER or
 | |
|          * BLRM_CHECK_SEMISYNC if request_semi_sync option is false or
 | |
|          * master doesn't support semisync or it's not enabled.
 | |
|          */
 | |
|         if (router->master_state == BLRM_REQUEST_SEMISYNC)
 | |
|         {
 | |
|             /* discard master reply */
 | |
|             gwbuf_free(buf);
 | |
| 
 | |
|             /* Continue */
 | |
|             router->master_state = BLRM_REQUEST_BINLOGDUMP;
 | |
|         }
 | |
| 
 | |
|     case BLRM_REQUEST_BINLOGDUMP:
 | |
|         /**
 | |
|          * This branch is reached after semi-sync check/request or
 | |
|          * just after sending COM_REGISTER_SLAVE
 | |
|          * if request_semi_sync option is false.
 | |
|          */
 | |
| 
 | |
|         /**
 | |
|          * End of registration process:
 | |
|          *
 | |
|          * Now request a dump of the binlog file: COM_BINLOG_DUMP
 | |
|          */
 | |
|         buf = blr_make_binlog_dump(router);
 | |
|         router->master_state = BLRM_BINLOGDUMP;
 | |
|         router->master->func.write(router->master, buf);
 | |
| 
 | |
|         if (router->binlog_name[0])
 | |
|         {
 | |
|             MXS_NOTICE("%s: Request binlog records from %s at "
 | |
|                        "position %lu from master server [%s]:%d",
 | |
|                        router->service->name,
 | |
|                        router->binlog_name,
 | |
|                        router->current_pos,
 | |
|                        router->service->dbref->server->address,
 | |
|                        router->service->dbref->server->port);
 | |
|         }
 | |
| 
 | |
|         /* Log binlog router identity */
 | |
|         blr_log_identity(router);
 | |
|         break;
 | |
| 
 | |
|     case BLRM_BINLOGDUMP:
 | |
|         /**
 | |
|          * Main body, we have received a binlog record from the master
 | |
|          */
 | |
|         blr_handle_binlog_record(router, buf);
 | |
| 
 | |
|         /**
 | |
|          * Set heartbeat check task
 | |
|          */
 | |
|         if (router->heartbeat > 0)
 | |
|         {
 | |
|             char task_name[BLRM_TASK_NAME_LEN + 1] = "";
 | |
|             snprintf(task_name,
 | |
|                      BLRM_TASK_NAME_LEN,
 | |
|                      "%s heartbeat",
 | |
|                      router->service->name);
 | |
|             hktask_add(task_name,
 | |
|                        blr_check_last_master_event,
 | |
|                        router,
 | |
|                        router->heartbeat);
 | |
|         }
 | |
| 
 | |
|         break;
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Slave Protocol registration to Master (MariaDB 10 compatibility):
 | |
|  *
 | |
|  * Handles previous reply from MariaDB10 Master (GTID Domain ID) and
 | |
|  * sends the SET @slave_connect_state='x-y-z' GTID registration.
 | |
|  *
 | |
|  * The next state is set to BLRM_MARIADB10_REQUEST_GTID
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @param buf       GWBUF with server reply to previous
 | |
|  *                  registration command
 | |
|  */
 | |
| static void blr_register_mariadb_gtid_request(ROUTER_INSTANCE* router,
 | |
|                                               GWBUF* buf)
 | |
| {
 | |
|     const char format_gtid_val[] = "SET @slave_connect_state='%s'";
 | |
| 
 | |
|     // SET the requested GTID
 | |
|     char set_gtid[GTID_MAX_LEN + sizeof(format_gtid_val)];
 | |
|     sprintf(set_gtid,
 | |
|             format_gtid_val,
 | |
|             router->last_mariadb_gtid);
 | |
| 
 | |
|     MXS_INFO("%s: Requesting GTID (%s) from master server.",
 | |
|              router->service->name,
 | |
|              router->last_mariadb_gtid);
 | |
|     // Send the request
 | |
|     blr_register_send_command(router,
 | |
|                               set_gtid,
 | |
|                               BLRM_MARIADB10_REQUEST_GTID);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * The routine hanldes a Fake ROTATE_EVENT
 | |
|  *
 | |
|  * It takes care of any missing file between
 | |
|  * current file and the one in rotate event:
 | |
|  * files with 4 bytes size colud be created.
 | |
|  *
 | |
|  * The filename specified in rotate event
 | |
|  * is created/overwritten
 | |
|  *
 | |
|  * @param router    The router instance
 | |
|  * @param hdr       The Replication event header
 | |
|  * @param ptr       The packet data
 | |
|  * @return          True for succesfull binlog file rotation,
 | |
|  *                  False otherwise.
 | |
|  */
 | |
| bool blr_handle_fake_rotate(ROUTER_INSTANCE* router,
 | |
|                             REP_HEADER* hdr,
 | |
|                             uint8_t* ptr)
 | |
| {
 | |
|     mxb_assert(hdr->event_type == ROTATE_EVENT);
 | |
| 
 | |
|     uint64_t pos;
 | |
|     int len, slen;
 | |
|     char file[BINLOG_FNAMELEN + 1];
 | |
| 
 | |
|     len = hdr->event_size - BINLOG_EVENT_HDR_LEN;   // Event size minus header
 | |
|     slen = len - (8 + BINLOG_EVENT_CRC_SIZE);       // Allow for position and CRC
 | |
|     if (!router->master_chksum)
 | |
|     {
 | |
|         slen += BINLOG_EVENT_CRC_SIZE;
 | |
|     }
 | |
|     if (slen > BINLOG_FNAMELEN)
 | |
|     {
 | |
|         slen = BINLOG_FNAMELEN;
 | |
|     }
 | |
|     memcpy(file, ptr + BINLOG_EVENT_HDR_LEN + 8, slen);
 | |
|     file[slen] = 0;
 | |
| 
 | |
|     pos = extract_field(ptr + BINLOG_EVENT_HDR_LEN + 4, 32);
 | |
|     pos <<= 32;
 | |
|     pos |= extract_field(ptr + BINLOG_EVENT_HDR_LEN, 32);
 | |
| 
 | |
|     MXS_DEBUG("Fake ROTATE_EVENT received: file %s, pos %" PRIu64
 | |
|               ". Next event at pos %" PRIu32,
 | |
|               file,
 | |
|               pos,
 | |
|               hdr->next_pos);
 | |
|     /**
 | |
|      * Detect any missing file in sequence.
 | |
|      */
 | |
|     if (!blr_handle_missing_files(router, file))
 | |
|     {
 | |
|         return false;
 | |
|     }
 | |
| 
 | |
|     pthread_mutex_lock(&router->binlog_lock);
 | |
| 
 | |
|     /* Set writing pos to 4 if Master GTID */
 | |
|     if (router->mariadb10_master_gtid && pos == 4)
 | |
|     {
 | |
|         /**
 | |
|          * If a MariadB 10 Slave is connecting and reading the
 | |
|          * events from this binlog file, the router->binlog_position check
 | |
|          * might fail in blr_slave.c:blr_slave_binlog_dump()
 | |
|          * and the slave connection will be closed.
 | |
|          *
 | |
|          * The slave will automatically try to re-connect.
 | |
|          */
 | |
|         router->last_written = BINLOG_MAGIC_SIZE;
 | |
|         router->current_pos = BINLOG_MAGIC_SIZE;
 | |
|         router->binlog_position = BINLOG_MAGIC_SIZE;
 | |
|         router->last_event_pos = BINLOG_MAGIC_SIZE;
 | |
|         router->current_safe_event = BINLOG_MAGIC_SIZE;
 | |
|     }
 | |
| 
 | |
|     router->rotating = 1;
 | |
| 
 | |
|     pthread_mutex_unlock(&router->binlog_lock);
 | |
| 
 | |
|     return blr_rotate_event(router, ptr, hdr);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * The routine hanldes a Fake GTID_LIST_EVENT, MariaDB 10 only.
 | |
|  *
 | |
|  * Fake MARIADB10_GTID_GTID_LIST_EVENT could be sent
 | |
|  * when using GTID registration with MariaDB 10 server
 | |
|  * The event header 'next_pos' tells where to write
 | |
|  * the next event.
 | |
|  * We set internal pointers to that position.
 | |
|  *
 | |
|  * @param router    The router instance
 | |
|  * @param hdr       The Replication event header
 | |
|  * @param ptr       The packet data
 | |
|  */
 | |
| void blr_handle_fake_gtid_list(ROUTER_INSTANCE* router,
 | |
|                                REP_HEADER* hdr,
 | |
|                                uint8_t* ptr)
 | |
| {
 | |
|     mxb_assert(hdr->event_type == MARIADB10_GTID_GTID_LIST_EVENT);
 | |
| 
 | |
|     if (router->mariadb10_master_gtid)
 | |
|     {
 | |
|         uint64_t binlog_file_eof = lseek(router->binlog_fd, 0L, SEEK_END);
 | |
| 
 | |
|         MXS_INFO("Fake GTID_LIST received: file %s, pos %" PRIu64
 | |
|                  ". Next event at pos %" PRIu32,
 | |
|                  router->binlog_name,
 | |
|                  router->current_pos,
 | |
|                  hdr->next_pos);
 | |
| 
 | |
|         /**
 | |
|          * We could write in any binlog file position:
 | |
|          * fill any GAP with an ignorable event
 | |
|          * if GTID_LIST next_pos is greter than current EOF
 | |
|          */
 | |
|         if (hdr->next_pos && (hdr->next_pos > binlog_file_eof))
 | |
|         {
 | |
|             uint64_t hole_size = hdr->next_pos - binlog_file_eof;
 | |
| 
 | |
|             MXS_INFO("Detected hole while processing"
 | |
|                      " a Fake GTID_LIST Event: hole size will be %"
 | |
|                      PRIu64 " bytes",
 | |
|                      hole_size);
 | |
| 
 | |
|             /* Set the offet for the write routine */
 | |
|             pthread_mutex_lock(&router->binlog_lock);
 | |
| 
 | |
|             router->last_written = binlog_file_eof;
 | |
| 
 | |
|             pthread_mutex_unlock(&router->binlog_lock);
 | |
| 
 | |
|             // Write One Hole
 | |
|             // TODO: write small holes
 | |
|             blr_write_special_event(router,
 | |
|                                     binlog_file_eof,
 | |
|                                     hole_size,
 | |
|                                     hdr,
 | |
|                                     BLRM_IGNORABLE);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             // Increment the internal offsets
 | |
|             pthread_mutex_lock(&router->binlog_lock);
 | |
| 
 | |
|             router->last_written = hdr->next_pos;
 | |
|             router->last_event_pos = router->current_pos;
 | |
|             router->current_pos = hdr->next_pos;
 | |
|             router->binlog_position = router->current_pos;
 | |
|             router->current_safe_event = router->current_pos;
 | |
| 
 | |
|             pthread_mutex_unlock(&router->binlog_lock);
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Detect any missing file in sequence between
 | |
|  * current router->binlog_name and new_file
 | |
|  * in fake ROTATE_EVENT.
 | |
|  *
 | |
|  * In case of missing files, new files with 4 bytes
 | |
|  * will be created up to new_file.
 | |
|  *
 | |
|  * @param    router      The current router
 | |
|  * @param    new_file    The filename in Fake ROTATE_EVENT
 | |
|  * @return               true on success, false on errors
 | |
|  */
 | |
| static bool blr_handle_missing_files(ROUTER_INSTANCE* router,
 | |
|                                      char* new_file)
 | |
| {
 | |
|     char* fptr;
 | |
|     uint32_t new_fseqno;
 | |
|     uint32_t curr_fseqno;
 | |
|     char buf[BLRM_BINLOG_NAME_STR_LEN];
 | |
|     char bigbuf[PATH_MAX + 1];
 | |
| 
 | |
|     if (*new_file
 | |
|         && (fptr = strrchr(new_file, '.')) == NULL)
 | |
|     {
 | |
|         return false;
 | |
|     }
 | |
|     if (router->fileroot)
 | |
|     {
 | |
|         MXS_FREE(router->fileroot);
 | |
|     }
 | |
|     /* set filestem */
 | |
|     router->fileroot = MXS_STRNDUP_A(new_file, fptr - new_file);
 | |
| 
 | |
|     new_fseqno = atol(fptr + 1);
 | |
| 
 | |
|     if (!*router->binlog_name)
 | |
|     {
 | |
|         MXS_INFO("Fake ROTATE_EVENT comes with %s log file."
 | |
|                  " Current router binlog file has not been set yet."
 | |
|                  " Skipping creation of empty files"
 | |
|                  " before sequence %" PRIu32 "",
 | |
|                  new_file,
 | |
|                  new_fseqno);
 | |
|         return true;
 | |
|     }
 | |
| 
 | |
|     if (*router->binlog_name
 | |
|         && (fptr = strrchr(router->binlog_name, '.')) == NULL)
 | |
|     {
 | |
|         return false;
 | |
|     }
 | |
|     curr_fseqno = atol(fptr + 1);
 | |
|     int32_t delta_seq = new_fseqno - (curr_fseqno + 1);
 | |
| 
 | |
|     /**
 | |
|      * Try creating delta_seq empty binlog files:
 | |
|      *
 | |
|      * Note: currenlty working for positive delta
 | |
|      * and same filestem.
 | |
|      */
 | |
|     if (delta_seq > 0)
 | |
|     {
 | |
|         MXS_INFO("Fake ROTATE_EVENT comes with a %" PRIu32
 | |
|                  " delta sequence in its name."
 | |
|                  " Creating %" PRIi32 " empty files",
 | |
|                  delta_seq,
 | |
|                  delta_seq);
 | |
| 
 | |
|         // Create up to (delta_seq - 1) empty (with 4 bytes) binlog files
 | |
|         for (int i = 1; i <= delta_seq; i++)
 | |
|         {
 | |
|             sprintf(buf, BINLOG_NAMEFMT, router->fileroot, curr_fseqno + i);
 | |
|             if (!blr_file_new_binlog(router, buf))
 | |
|             {
 | |
|                 return false;
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 MXS_INFO("Created empty binlog file [%d] '%s'"
 | |
|                          " due to Fake ROTATE_EVENT file sequence delta.",
 | |
|                          i,
 | |
|                          buf);
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         // Some files created, return true
 | |
|         return true;
 | |
|     }
 | |
| 
 | |
|     // Did nothing, just return true
 | |
|     return true;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Check the connection retry limit and increment
 | |
|  * by BLR_MASTER_BACKOFF_TIME up to router->retry_interval.
 | |
|  *
 | |
|  * @param router    The current router instance
 | |
|  * @return          The interval to use for next reconnect
 | |
|  *                  or 0 if router->retry_limit has been hit.
 | |
|  */
 | |
| static int blr_check_connect_retry(ROUTER_INSTANCE* router)
 | |
| {
 | |
|     /* Stop reconnection to master */
 | |
|     if (router->retry_count >= router->retry_limit)
 | |
|     {
 | |
|         return -1;
 | |
|     }
 | |
| 
 | |
|     mxb_assert(router->configs.size() > 0);
 | |
|     if (router->config_index < static_cast<int>(router->configs.size() - 1))
 | |
|     {
 | |
|         // We have unused configs; no need to sleep anything at all. We will
 | |
|         // sleep only when we have unsuccessfully cycled through all available
 | |
|         // servers.
 | |
|         return 0;
 | |
|     }
 | |
| 
 | |
|     /* Return the interval for next reconnect */
 | |
|     if (router->retry_count >= router->retry_interval / BLR_MASTER_BACKOFF_TIME)
 | |
|     {
 | |
|         return router->retry_interval;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         return BLR_MASTER_BACKOFF_TIME * (1 + router->retry_count);
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Set checksum value in router instance
 | |
|  *
 | |
|  * @param inst    The router instance
 | |
|  * @param buf     The buffer with checksum value
 | |
|  */
 | |
| void blr_set_checksum(ROUTER_INSTANCE* inst, GWBUF* buf)
 | |
| {
 | |
|     if (buf)
 | |
|     {
 | |
|         char* val = blr_extract_column(buf, 1);
 | |
|         if (val && strncasecmp(val, "NONE", 4) == 0)
 | |
|         {
 | |
|             inst->master_chksum = false;
 | |
|         }
 | |
|         if (val)
 | |
|         {
 | |
|             MXS_FREE(val);
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| void blr_master_set_config(ROUTER_INSTANCE* inst, const ChangeMasterConfig& config)
 | |
| {
 | |
|     SERVICE* service = inst->service;
 | |
|     SERVER* backend_server = service->dbref->server;
 | |
| 
 | |
|     if (!config.host.empty())
 | |
|     {
 | |
|         server_update_address(backend_server, config.host.c_str());
 | |
|     }
 | |
| 
 | |
|     if (config.port)
 | |
|     {
 | |
|         server_update_port(backend_server, config.port);
 | |
|     }
 | |
| 
 | |
|     if (!config.user.empty())
 | |
|     {
 | |
|         MXS_FREE(inst->user);
 | |
|         inst->user = MXS_STRDUP_A(config.user.c_str());
 | |
|     }
 | |
| 
 | |
|     if (!config.password.empty())
 | |
|     {
 | |
|         MXS_FREE(inst->password);
 | |
|         inst->password = MXS_STRDUP_A(config.password.c_str());
 | |
|     }
 | |
| 
 | |
|     inst->ssl_enabled = config.ssl_enabled;
 | |
| 
 | |
|     if (!config.ssl_ca.empty())
 | |
|     {
 | |
|         MXS_FREE(backend_server->server_ssl->ssl_ca_cert);
 | |
|         backend_server->server_ssl->ssl_ca_cert = MXS_STRDUP_A(config.ssl_ca.c_str());
 | |
|         MXS_FREE(inst->ssl_ca);
 | |
|         inst->ssl_ca = MXS_STRDUP_A(config.ssl_ca.c_str());
 | |
|     }
 | |
| 
 | |
|     if (!config.ssl_cert.empty())
 | |
|     {
 | |
|         MXS_FREE(backend_server->server_ssl->ssl_cert);
 | |
|         backend_server->server_ssl->ssl_cert = MXS_STRDUP_A(config.ssl_cert.c_str());
 | |
|         MXS_FREE(inst->ssl_cert);
 | |
|         inst->ssl_cert = MXS_STRDUP_A(config.ssl_cert.c_str());
 | |
|     }
 | |
| 
 | |
|     if (!config.ssl_key.empty())
 | |
|     {
 | |
|         MXS_FREE(backend_server->server_ssl->ssl_key);
 | |
|         backend_server->server_ssl->ssl_key = MXS_STRDUP_A(config.ssl_key.c_str());
 | |
|         MXS_FREE(inst->ssl_key);
 | |
|         inst->ssl_key = MXS_STRDUP_A(config.ssl_key.c_str());
 | |
|     }
 | |
| 
 | |
|     if (!config.ssl_version.empty())
 | |
|     {
 | |
|         if (listener_set_ssl_version(backend_server->server_ssl, config.ssl_version.c_str()) != 0)
 | |
|         {
 | |
|             MXS_ERROR("Found unknown optional parameter value for 'ssl_version' for"
 | |
|                       " service '%s': %s, ignoring it.",
 | |
|                       inst->service->name,
 | |
|                       config.ssl_version.c_str());
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             inst->ssl_version = MXS_STRDUP_A(config.ssl_version.c_str());
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     if (config.heartbeat_period >= 0)
 | |
|     {
 | |
|         if (inst->heartbeat > 0 && (config.heartbeat_period == 0))
 | |
|         {
 | |
|             blr_log_disabled_heartbeat(inst);
 | |
|         }
 | |
| 
 | |
|         inst->heartbeat = config.heartbeat_period;
 | |
|     }
 | |
| 
 | |
|     if (config.connect_retry > 0)
 | |
|     {
 | |
|         inst->retry_interval = config.connect_retry;
 | |
|     }
 | |
| }
 |