Merge branch 'develop' into firewall

Conflicts:
	server/modules/filter/test/harness_common.c
This commit is contained in:
Markus Makela
2014-11-25 17:56:26 +02:00
127 changed files with 3648 additions and 981 deletions

View File

@ -25,7 +25,10 @@
#include <modutil.h>
#include <mysqlhint.h>
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
/**
* hintparser.c - Find any comment in the SQL packet and look for MAXSCALE

View File

@ -51,7 +51,10 @@
#include <regex.h>
#include <string.h>
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
MODULE_INFO info = {
MODULE_API_FILTER,

View File

@ -24,7 +24,10 @@
#include <string.h>
#include <regex.h>
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
/**
* @file regexfilter.c - a very simple regular expression rewrite filter.

View File

@ -58,7 +58,10 @@
#include <router.h>
#include <dcb.h>
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
MODULE_INFO info = {
MODULE_API_FILTER,
@ -403,7 +406,7 @@ GWBUF *clone = NULL;
if (my_session->residual < 0)
my_session->residual = 0;
}
else if (my_session->active && (ptr = modutil_get_SQL(queue) != NULL))
else if ( my_session->active && (ptr = modutil_get_SQL(queue)) != NULL)
{
if ((my_instance->match == NULL ||
regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) &&

View File

@ -318,16 +318,16 @@ int clientReply(void* ins, void* session, GWBUF* queue)
int fdgets(int fd, char* buff, int size)
{
int i = 0;
if(fd > 0){
while(i < size - 1 && read(fd,&buff[i],1))
{
if(buff[i] == '\n' || buff[i] == '\0')
{
break;
}
i++;
}
}
while(i < size - 1 && read(fd,&buff[i],1))
{
if(buff[i] == '\n' || buff[i] == '\0')
{
break;
}
i++;
}
buff[i] = '\0';
return i;
}
@ -1053,11 +1053,7 @@ int process_opts(int argc, char** argv)
case 'e':
instance.expected = open_file(optarg,0);
if(instance.expected > 0){
printf("Expected output is read from: %s\n",optarg);
}else{
printf("Error: Failed to open file: %s\n",optarg);
}
printf("Expected output is read from: %s\n",optarg);
break;
case 'o':
@ -1083,12 +1079,12 @@ int process_opts(int argc, char** argv)
case 's':
instance.session_count = atoi(optarg);
printf("Sessions: %i\n",instance.session_count);
printf("Sessions: %i ",instance.session_count);
break;
case 't':
instance.thrcount = atoi(optarg);
printf("Threads: %i\n",instance.thrcount);
printf("Threads: %i ",instance.thrcount);
break;
case 'd':
@ -1140,29 +1136,22 @@ int compare_files(int a,int b)
{
char in[4098];
char exp[4098];
int line = 1,ard, brd,running = 1;
int line = 1;
if(a < 1 || b < 1){
printf("Invalid file descriptors: %d %d\n",a,b);
return 1;
}
if(lseek(a,0,SEEK_SET) < 0 ||
lseek(b,0,SEEK_SET) < 0){
printf("Failed lseek() call on file descriptors: %d %d\n",a,b);
return 1;
}
while(running){
memset(in,0,4098);
memset(exp,0,4098);
ard = fdgets(a,in,4098);
brd = fdgets(b,exp,4098);
if(ard == 0 && brd == 0){
break;
}
if(ard == 0 || brd == 0 || strcmp(in,exp)){
while(fdgets(a,in,4098) && fdgets(b,exp,4098)){
if(strcmp(in,exp)){
printf("The files differ at line %d:\n%s\n-------------------------------------\n%s\n",line,in,exp);
return 1;
}

View File

@ -48,7 +48,10 @@
#include <sys/time.h>
#include <regex.h>
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
MODULE_INFO info = {
MODULE_API_FILTER,

View File

@ -33,19 +33,40 @@
#include <buffer.h>
#include <pthread.h>
#include <memlog.h>
#define BINLOG_FNAMELEN 16
#define BLR_PROTOCOL "MySQLBackend"
#define BINLOG_MAGIC { 0xfe, 0x62, 0x69, 0x6e }
#define BINLOG_NAMEFMT "%s.%06d"
#define BINLOG_NAME_ROOT "mysql-bin"
/* How often to call the binlog status function (seconds) */
#define BLR_STATS_FREQ 60
#define BLR_NSTATS_MINUTES 30
/**
* High and Low water marks for the slave dcb. These values can be overriden
* by the router options highwater and lowwater.
*/
#define DEF_LOW_WATER 2000
#define DEF_HIGH_WATER 30000
#define DEF_LOW_WATER 1000
#define DEF_HIGH_WATER 10000
/**
* Default burst sizes for slave catchup
*/
#define DEF_SHORT_BURST 15
#define DEF_LONG_BURST 500
#define DEF_BURST_SIZE 1024000 /* 1 Mb */
/**
* master reconnect backoff constants
* BLR_MASTER_BACKOFF_TIME The increments of the back off time (seconds)
* BLR_MAX_BACKOFF Maximum number of increments to backoff to
*/
#define BLR_MASTER_BACKOFF_TIME 5
#define BLR_MAX_BACKOFF 60
/**
* Some useful macros for examining the MySQL Response packets
*/
@ -56,24 +77,71 @@
#define MYSQL_ERROR_MSG(buf) ((uint8_t *)GWBUF_DATA(buf) + 6)
#define MYSQL_COMMAND(buf) (*((uint8_t *)GWBUF_DATA(buf) + 4))
/**
* Packet header for replication messages
*/
typedef struct rep_header {
int payload_len; /*< Payload length (24 bits) */
uint8_t seqno; /*< Response sequence number */
uint8_t ok; /*< OK Byte from packet */
uint32_t timestamp; /*< Timestamp - start of binlog record */
uint8_t event_type; /*< Binlog event type */
uint32_t serverid; /*< Server id of master */
uint32_t event_size; /*< Size of header, post-header and body */
uint32_t next_pos; /*< Position of next event */
uint16_t flags; /*< Event flags */
} REP_HEADER;
/**
* The binlog record structure. This contains the actual packet read from the binlog
* file.
*/
typedef struct {
unsigned long position; /*< binlog record position for this cache entry */
GWBUF *pkt; /*< The packet received from the master */
REP_HEADER hdr; /*< The packet header */
} BLCACHE_RECORD;
/**
* The binlog cache. A cache exists for each file that hold cached bin log records.
* Caches will be used for all files being read by more than 1 slave.
*/
typedef struct {
BLCACHE_RECORD **records; /*< The actual binlog records */
int current; /*< The next record that will be inserted */
int cnt; /*< The number of records in the cache */
SPINLOCK lock; /*< The spinlock for the cache */
} BLCACHE;
typedef struct blfile {
char binlogname[BINLOG_FNAMELEN+1]; /*< Name of the binlog file */
int fd; /*< Actual file descriptor */
int refcnt; /*< Reference count for file */
BLCACHE *cache; /*< Record cache for this file */
SPINLOCK lock; /*< The file lock */
struct blfile *next; /*< Next file in list */
} BLFILE;
/**
* Slave statistics
*/
typedef struct {
int n_events; /*< Number of events sent */
int n_bursts; /*< Number of bursts sent */
int n_requests; /*< Number of requests received */
int n_flows; /*< Number of flow control restarts */
int n_catchupnr; /*< No. of times catchup resulted in not entering loop */
int n_alreadyupd;
int n_upd;
int n_cb;
int n_cbna;
int n_dcb;
int n_above;
int n_failed_read;
int n_overrun;
int n_actions[3];
int n_events; /*< Number of events sent */
int n_bursts; /*< Number of bursts sent */
int n_requests; /*< Number of requests received */
int n_flows; /*< Number of flow control restarts */
int n_upd;
int n_cb;
int n_cbna;
int n_dcb;
int n_above;
int n_failed_read;
int n_overrun;
int n_actions[3];
uint64_t lastsample;
int minno;
int minavgs[BLR_NSTATS_MINUTES];
} SLAVE_STATS;
/**
@ -89,6 +157,7 @@ typedef struct router_slave {
int binlog_pos; /*< Binlog position for this slave */
char binlogfile[BINLOG_FNAMELEN+1];
/*< Current binlog file for this slave */
BLFILE *file; /*< Currently open binlog file */
int serverid; /*< Server-id of the slave */
char *hostname; /*< Hostname of the slave, if known */
char *user; /*< Username if given */
@ -132,6 +201,9 @@ typedef struct {
uint64_t n_fakeevents; /*< Fake events not written to disk */
uint64_t n_artificial; /*< Artificial events not written to disk */
uint64_t events[0x24]; /*< Per event counters */
uint64_t lastsample;
int minno;
int minavgs[BLR_NSTATS_MINUTES];
} ROUTER_STATS;
/**
@ -153,37 +225,6 @@ typedef struct {
int fde_len; /*< Length of fde_event */
} MASTER_RESPONSES;
/**
* The binlog record structure. This contains the actual packet received from the
* master, the binlog position of the data in the packet, a point to the data and
* the length of the binlog record.
*
* This allows requests for binlog records in the cache to be serviced by simply
* sending the exact same packet as was received by MaxScale from the master.
* Items are written to the backing file as soon as they are received. The binlog
* cache is flushed of old records periodically, releasing the GWBUF's back to the
* free memory pool.
*/
typedef struct {
unsigned long position; /*< binlog record position for this cache entry */
GWBUF *pkt; /*< The packet received from the master */
unsigned char *data; /*< Pointer to the data within the packet */
unsigned int record_len; /*< Binlog record length */
} BLCACHE_RECORD;
/**
* The binlog cache. A cache exists for each file that hold cached bin log records.
* Typically the router will hold two binlog caches, one for the current file and one
* for the previous file.
*/
typedef struct {
char filename[BINLOG_FNAMELEN+1];
BLCACHE_RECORD *first;
BLCACHE_RECORD *current;
int cnt;
} BLCACHE;
/**
* The per instance data for the router.
*/
@ -205,6 +246,8 @@ typedef struct router_instance {
uint8_t lastEventReceived;
GWBUF *residual; /*< Any residual binlog event */
MASTER_RESPONSES saved_master; /*< Saved master responses */
char *binlogdir; /*< The directory with the binlog files */
SPINLOCK binlog_lock; /*< Lock to control update of the binlog position */
char binlog_name[BINLOG_FNAMELEN+1];
/*< Name of the current binlog file */
uint64_t binlog_position;
@ -212,32 +255,25 @@ typedef struct router_instance {
int binlog_fd; /*< File descriptor of the binlog
* file being written
*/
uint64_t last_written; /*< Position of last event written */
char prevbinlog[BINLOG_FNAMELEN+1];
int rotating; /*< Rotation in progress flag */
BLFILE *files; /*< Files used by the slaves */
SPINLOCK fileslock; /*< Lock for the files queue above */
unsigned int low_water; /*< Low water mark for client DCB */
unsigned int high_water; /*< High water mark for client DCB */
BLCACHE *cache[2];
unsigned int short_burst; /*< Short burst for slave catchup */
unsigned int long_burst; /*< Long burst for slave catchup */
unsigned long burst_size; /*< Maximum size of burst to send */
ROUTER_STATS stats; /*< Statistics for this router */
int active_logs;
int reconnect_pending;
int retry_backoff;
int handling_threads;
struct router_instance
*next;
} ROUTER_INSTANCE;
/**
* Packet header for replication messages
*/
typedef struct rep_header {
int payload_len; /*< Payload length (24 bits) */
uint8_t seqno; /*< Response sequence number */
uint8_t ok; /*< OK Byte from packet */
uint32_t timestamp; /*< Timestamp - start of binlog record */
uint8_t event_type; /*< Binlog event type */
uint32_t serverid; /*< Server id of master */
uint32_t event_size; /*< Size of header, post-header and body */
uint32_t next_pos; /*< Position of next event */
uint16_t flags; /*< Event flags */
} REP_HEADER;
/**
* State machine for the master to MaxScale replication
*/
@ -270,21 +306,23 @@ static char *blrm_states[] = { "Unconnected", "Authenticated", "Timestamp retrie
#define BLRS_UNREGISTERED 0x0001
#define BLRS_REGISTERED 0x0002
#define BLRS_DUMPING 0x0003
#define BLRS_ERRORED 0x0004
#define BLRS_MAXSTATE 0x0003
#define BLRS_MAXSTATE 0x0004
static char *blrs_states[] = { "Created", "Unregistered", "Registered",
"Sending binlogs" };
"Sending binlogs", "Errored" };
/**
* Slave catch-up status
*/
#define CS_READING 0x0001
#define CS_INNERLOOP 0x0002
#define CS_UPTODATE 0x0004
#define CS_EXPECTCB 0x0008
#define CS_DIST 0x0010
#define CS_DISTLATCH 0x0020
#define CS_THRDWAIT 0x0040
#define CS_BUSY 0x0100
#define CS_HOLD 0x0200
/**
* MySQL protocol OpCodes needed for replication
@ -347,22 +385,45 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered",
#define LOG_EVENT_NO_FILTER_F 0x0100
#define LOG_EVENT_MTS_ISOLATE_F 0x0200
/**
* Macros to extract common fields
*/
#define INLINE_EXTRACT 1 /* Set to 0 for debug purposes */
#if INLINE_EXTRACT
#define EXTRACT16(x) (*(uint8_t *)(x) | (*((uint8_t *)(x) + 1) << 8))
#define EXTRACT24(x) (*(uint8_t *)(x) | \
(*((uint8_t *)(x) + 1) << 8) | \
(*((uint8_t *)(x) + 2) << 16))
#define EXTRACT32(x) (*(uint8_t *)(x) | \
(*((uint8_t *)(x) + 1) << 8) | \
(*((uint8_t *)(x) + 2) << 16) | \
(*((uint8_t *)(x) + 3) << 24))
#else
#define EXTRACT16(x) extract_field((x), 16)
#define EXTRACT24(x) extract_field((x), 24)
#define EXTRACT32(x) extract_field((x), 32)
#endif
/*
* Externals within the router
*/
extern void blr_start_master(ROUTER_INSTANCE *);
extern void blr_master_response(ROUTER_INSTANCE *, GWBUF *);
extern void blr_master_reconnect(ROUTER_INSTANCE *);
extern int blr_master_connected(ROUTER_INSTANCE *);
extern int blr_slave_request(ROUTER_INSTANCE *, ROUTER_SLAVE *, GWBUF *);
extern void blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr);
extern int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
extern int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
extern void blr_init_cache(ROUTER_INSTANCE *);
extern void blr_file_init(ROUTER_INSTANCE *);
extern int blr_open_binlog(ROUTER_INSTANCE *, char *);
extern void blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *);
extern void blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t);
extern void blr_file_flush(ROUTER_INSTANCE *);
extern GWBUF *blr_read_binlog(int, unsigned int, REP_HEADER *);
extern BLFILE *blr_open_binlog(ROUTER_INSTANCE *, char *);
extern GWBUF *blr_read_binlog(ROUTER_INSTANCE *, BLFILE *, unsigned int, REP_HEADER *);
extern void blr_close_binlog(ROUTER_INSTANCE *, BLFILE *);
extern unsigned long blr_file_size(BLFILE *);
#endif

View File

@ -50,7 +50,10 @@
#include <dcb.h>
#include <modinfo.h>
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
static void monitorMain(void *);
@ -74,6 +77,8 @@ static MONITOR_SERVERS *get_candidate_master(MONITOR_SERVERS *);
static MONITOR_SERVERS *set_cluster_master(MONITOR_SERVERS *, MONITOR_SERVERS *, int);
static void disableMasterFailback(void *, int);
static void setNetworkTimeout(void *arg, int type, int value);
static bool mon_status_changed(MONITOR_SERVERS* mon_srv);
static bool mon_print_fail_status(MONITOR_SERVERS* mon_srv);
static MONITOR_OBJECT MyObject = {
startMonitor,
@ -345,6 +350,9 @@ char *server_string;
if (SERVER_IN_MAINT(database->server))
return;
/** Store previous status */
database->mon_prev_status = database->server->status;
if (database->con == NULL || mysql_ping(database->con) != 0)
{
char *dpwd = decryptPassword(passwd);
@ -362,13 +370,7 @@ char *server_string;
if (mysql_real_connect(database->con, database->server->name,
uname, dpwd, NULL, database->server->port, NULL, 0) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Monitor was unable to connect to "
"server %s:%d : \"%s\"",
database->server->name,
database->server->port,
mysql_error(database->con))));
free(dpwd);
server_clear_status(database->server, SERVER_RUNNING);
@ -382,8 +384,20 @@ char *server_string;
{
server_set_status(database->server, SERVER_AUTH_ERROR);
}
database->server->node_id = -1;
free(dpwd);
if (mon_status_changed(database) && mon_print_fail_status(database))
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Monitor was unable to connect to "
"server %s:%d : \"%s\"",
database->server->name,
database->server->port,
mysql_error(database->con))));
}
return;
}
else
@ -458,6 +472,8 @@ MONITOR_SERVERS *ptr;
size_t nrounds = 0;
MONITOR_SERVERS *candidate_master = NULL;
int master_stickiness = handle->disableMasterFailback;
int is_cluster=0;
int log_no_members = 1;
if (mysql_thread_init())
{
@ -494,12 +510,14 @@ int master_stickiness = handle->disableMasterFailback;
}
nrounds += 1;
/* reset cluster members counter */
is_cluster=0;
ptr = handle->databases;
while (ptr)
{
unsigned int prev_status = ptr->server->status;
monitorDatabase(handle, ptr);
/* clear bits for non member nodes */
@ -515,17 +533,27 @@ int master_stickiness = handle->disableMasterFailback;
}
/* Log server status change */
if (ptr->server->status != prev_status ||
SERVER_IS_DOWN(ptr->server))
if (mon_status_changed(ptr))
{
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"Backend server %s:%d state : %s",
ptr->server->name,
ptr->server->port,
STRSRVSTATUS(ptr->server))));
}
if (SERVER_IS_DOWN(ptr->server))
{
/** Increase this server'e error count */
ptr->mon_err_count += 1;
}
else
{
/** Reset this server's error count */
ptr->mon_err_count = 0;
}
ptr = ptr->next;
}
@ -571,8 +599,24 @@ int master_stickiness = handle->disableMasterFailback;
}
}
is_cluster++;
ptr = ptr->next;
}
if (is_cluster == 0 && log_no_members) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error: there are no cluster members")));
log_no_members = 0;
} else {
if (is_cluster > 0 && log_no_members == 0) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Info: found cluster members")));
log_no_members = 1;
}
}
}
}
@ -675,6 +719,13 @@ MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
memcpy(&handle->disableMasterFailback, &disable, sizeof(int));
}
/**
* Set the default id to use in the monitor.
*
* @param arg The handle allocated by startMonitor
* @param type The connect timeout type
* @param value The timeout value to set
*/
static void
setNetworkTimeout(void *arg, int type, int value)
{
@ -728,3 +779,51 @@ int new_timeout = max_timeout -1;
break;
}
}
/**
* Check if current monitored server status has changed
*
* @param mon_srv The monitored server
* @return true if status has changed or false
*/
static bool mon_status_changed(
MONITOR_SERVERS* mon_srv)
{
bool succp;
if (mon_srv->mon_prev_status != mon_srv->server->status)
{
succp = true;
}
else
{
succp = false;
}
return succp;
}
/**
* Check if current monitored server has a loggable failure status
*
* @param mon_srv The monitored server
* @return true if failed status can be logged or false
*/
static bool mon_print_fail_status(
MONITOR_SERVERS* mon_srv)
{
bool succp;
int errcount = mon_srv->mon_err_count;
uint8_t modval;
modval = 1<<(MIN(errcount/10, 7));
if (SERVER_IS_DOWN(mon_srv->server) && errcount == 0)
{
succp = true;
}
else
{
succp = false;
}
return succp;
}

View File

@ -638,8 +638,8 @@ size_t nrounds = 0;
if (mon_status_changed(ptr) ||
mon_print_fail_status(ptr))
{
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"Backend server %s:%d state : %s",
ptr->server->name,
ptr->server->port,

View File

@ -44,6 +44,8 @@
* This means both IO and SQL threads are not working on slaves.
* This option is not enabled by default.
* 10/11/14 Massimiliano Pinto Addition of setNetworkTimeout for connect, read, write
* 18/11/14 Massimiliano Pinto One server only in configuration becomes master.
* servers=server1 must be present in mysql_mon and in router sections as well.
*
* @endverbatim
*/
@ -62,7 +64,10 @@
#include <dcb.h>
#include <modinfo.h>
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
static void monitorMain(void *);
@ -381,7 +386,7 @@ char *server_string;
if (SERVER_IN_MAINT(database->server))
return;
/** Store prevous status */
/** Store previous status */
database->mon_prev_status = database->server->status;
if (database->con == NULL || mysql_ping(database->con) != 0)
@ -409,17 +414,6 @@ char *server_string;
{
free(dpwd);
if (mon_print_fail_status(database))
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Monitor was unable to connect to "
"server %s:%d : \"%s\"",
database->server->name,
database->server->port,
mysql_error(database->con))));
}
/* The current server is not running
*
* Store server NOT running in server and monitor server pending struct
@ -445,6 +439,18 @@ char *server_string;
monitor_clear_pending_status(database, SERVER_SLAVE_OF_EXTERNAL_MASTER);
monitor_clear_pending_status(database, SERVER_STALE_STATUS);
/* Log connect failure only once */
if (mon_status_changed(database) && mon_print_fail_status(database))
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Monitor was unable to connect to "
"server %s:%d : \"%s\"",
database->server->name,
database->server->port,
mysql_error(database->con))));
}
return;
}
else
@ -601,8 +607,9 @@ MONITOR_SERVERS *ptr;
int replication_heartbeat = handle->replicationHeartbeat;
int detect_stale_master = handle->detectStaleMaster;
int num_servers=0;
MONITOR_SERVERS *root_master;
MONITOR_SERVERS *root_master = NULL;
size_t nrounds = 0;
int log_no_master = 1;
if (mysql_thread_init())
{
@ -667,21 +674,21 @@ size_t nrounds = 0;
dcb_call_foreach(DCB_REASON_NOT_RESPONDING);
}
if (mon_status_changed(ptr) ||
mon_print_fail_status(ptr))
if (mon_status_changed(ptr))
{
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"Backend server %s:%d state : %s",
ptr->server->name,
ptr->server->port,
STRSRVSTATUS(ptr->server))));
}
if (SERVER_IS_DOWN(ptr->server))
{
/** Increase this server'e error count */
ptr->mon_err_count += 1;
STRSRVSTATUS(ptr->server))));
}
if (SERVER_IS_DOWN(ptr->server))
{
/** Increase this server'e error count */
ptr->mon_err_count += 1;
}
else
{
/** Reset this server's error count */
@ -690,9 +697,26 @@ size_t nrounds = 0;
ptr = ptr->next;
}
/* Compute the replication tree */
root_master = get_replication_tree(handle, num_servers);
ptr = handle->databases;
/* if only one server is configured, that's is Master */
if (num_servers == 1) {
if (SERVER_IS_RUNNING(ptr->server)) {
ptr->server->depth = 0;
/* status cleanup */
monitor_clear_pending_status(ptr, SERVER_SLAVE);
/* master status set */
monitor_set_pending_status(ptr, SERVER_MASTER);
ptr->server->depth = 0;
handle->master = ptr;
root_master = ptr;
}
} else {
/* Compute the replication tree */
root_master = get_replication_tree(handle, num_servers);
}
/* Update server status from monitor pending status on that server*/
@ -702,18 +726,56 @@ size_t nrounds = 0;
if (! SERVER_IN_MAINT(ptr->server)) {
/* If "detect_stale_master" option is On, let's use the previus master */
if (detect_stale_master && root_master && (!strcmp(ptr->server->name, root_master->server->name) && ptr->server->port == root_master->server->port) && (ptr->server->status & SERVER_MASTER) && !(ptr->pending_status & SERVER_MASTER)) {
/* in this case server->status will not be updated from pending_status */
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE, "[mysql_mon]: root server [%s:%i] is no longer Master, let's use it again even if it could be a stale master, you have been warned!", ptr->server->name, ptr->server->port)));
/* Set the STALE bit for this server in server struct */
/**
* In this case server->status will not be updated from pending_statu
* Set the STALE bit for this server in server struct
*/
server_set_status(ptr->server, SERVER_STALE_STATUS);
/* log it once */
if (mon_status_changed(ptr)) {
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE, "[mysql_mon]: root server [%s:%i] is no longer Master,"
" let's use it again even if it could be a stale master,"
" you have been warned!",
ptr->server->name,
ptr->server->port)));
}
} else {
ptr->server->status = ptr->pending_status;
}
}
ptr = ptr->next;
}
/* log master detection failure od first master becomes available after failure */
if (root_master && mon_status_changed(root_master) && !(root_master->server->status & SERVER_STALE_STATUS)) {
if (root_master->pending_status & (SERVER_MASTER)) {
if (!(root_master->mon_prev_status & SERVER_STALE_STATUS)) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Info: A Master Server is now available: %s:%i",
root_master->server->name,
root_master->server->port)));
}
} else {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error: No Master can be determined. Last known was %s:%i",
root_master->server->name,
root_master->server->port)));
}
log_no_master = 1;
} else {
if (!root_master && log_no_master) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error: No Master can be determined")));
log_no_master = 0;
}
}
/* Do now the heartbeat replication set/get for MySQL Replication Consistency */
if (replication_heartbeat && root_master && (SERVER_IS_MASTER(root_master->server) || SERVER_IS_RELAY_SERVER(root_master->server))) {
set_master_heartbeat(handle, root_master);
@ -785,6 +847,12 @@ MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
memcpy(&handle->detectStaleMaster, &enable, sizeof(int));
}
/**
* Check if current monitored server status has changed
*
* @param mon_srv The monitored server
* @return true if status has changed or false
*/
static bool mon_status_changed(
MONITOR_SERVERS* mon_srv)
{
@ -801,6 +869,12 @@ static bool mon_status_changed(
return succp;
}
/**
* Check if current monitored server has a loggable failure status
*
* @param mon_srv The monitored server
* @return true if failed status can be logged or false
*/
static bool mon_print_fail_status(
MONITOR_SERVERS* mon_srv)
{
@ -810,7 +884,7 @@ static bool mon_print_fail_status(
modval = 1<<(MIN(errcount/10, 7));
if (SERVER_IS_DOWN(mon_srv->server) && errcount%modval == 0)
if (SERVER_IS_DOWN(mon_srv->server) && errcount == 0)
{
succp = true;
}
@ -1155,6 +1229,7 @@ static MONITOR_SERVERS *get_replication_tree(MYSQL_MONITOR *handle, int num_serv
add_slave_to_master(master->server->slaves, MONITOR_MAX_NUM_SLAVES, current->node_id);
master->server->depth = current->depth -1;
monitor_set_pending_status(master, SERVER_MASTER);
handle->master = master;
} else {
if (current->master_id > 0) {
/* this server is slave of another server not in MaxScale configuration

View File

@ -42,7 +42,10 @@
#include <dcb.h>
#include <modinfo.h>
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
static void monitorMain(void *);
@ -472,8 +475,8 @@ size_t nrounds = 0;
if (ptr->server->status != prev_status ||
SERVER_IS_DOWN(ptr->server))
{
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"Backend server %s:%d state : %s",
ptr->server->name,
ptr->server->port,

View File

@ -49,7 +49,10 @@ MODULE_INFO info = {
"An experimental HTTPD implementation for use in admnistration"
};
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
#define ISspace(x) isspace((int)(x))
#define HTTP_SERVER_STRING "MaxScale(c) v.1.0.0"
@ -170,11 +173,11 @@ HTTPD_session *client_data = NULL;
i = 0;
while (ISspace(buf[j]) && (j < sizeof(buf))) {
while ( (j < sizeof(buf)) && ISspace(buf[j])) {
j++;
}
while (!ISspace(buf[j]) && (i < sizeof(url) - 1) && (j < sizeof(buf) - 1)) {
while ((j < sizeof(buf) - 1) && !ISspace(buf[j]) && (i < sizeof(url) - 1)) {
url[i] = buf[j];
i++; j++;
}
@ -351,6 +354,7 @@ int n_connect = 0;
n_connect++;
}
}
close(so);
}
return n_connect;

View File

@ -45,7 +45,10 @@ MODULE_INFO info = {
"A maxscale protocol for the administration interface"
};
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
/**
* @file maxscaled.c - MaxScale administration protocol
@ -353,7 +356,13 @@ int rc;
}
// socket options
setsockopt(listener->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one));
if (setsockopt(listener->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)))
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Unable to set SO_REUSEADDR on maxscale listener."
)));
}
// set NONBLOCKING mode
setnonblocking(listener->fd);
// bind address and port

View File

@ -56,7 +56,10 @@ MODULE_INFO info = {
"The MySQL to backend server protocol"
};
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
static char *version_str = "V2.0.0";
static int gw_create_backend_connection(DCB *backend, SERVER *server, SESSION *in_session);
@ -820,6 +823,19 @@ static int gw_error_backend_event(DCB *dcb)
*/
if (dcb->state != DCB_STATE_POLLING)
{
int error, len;
char buf[100];
len = sizeof(error);
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0)
{
strerror_r(error, buf, 100);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"DCB in state %s got error '%s'.",
gw_dcb_state2string(dcb->state),
buf)));
}
return 1;
}
errbuf = mysql_create_custom_error(
@ -846,6 +862,18 @@ static int gw_error_backend_event(DCB *dcb)
if (ses_state != SESSION_STATE_ROUTER_READY)
{
int error, len;
char buf[100];
len = sizeof(error);
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0)
{
strerror_r(error, buf, 100);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error '%s' in session that is not ready for routing.",
buf)));
}
gwbuf_free(errbuf);
goto retblock;
}
@ -1037,6 +1065,19 @@ gw_backend_hangup(DCB *dcb)
if (ses_state != SESSION_STATE_ROUTER_READY)
{
int error, len;
char buf[100];
len = sizeof(error);
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0)
{
strerror_r(error, buf, 100);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Hangup in session that is not ready for routing, "
"Error reported is '%s'.",
buf)));
}
gwbuf_free(errbuf);
goto retblock;
}

View File

@ -54,7 +54,10 @@ MODULE_INFO info = {
"The client to MaxScale MySQL protocol implementation"
};
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
static char *version_str = "V1.0.0";

View File

@ -45,7 +45,10 @@
#include <skygw_utils.h>
#include <log_manager.h>
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
extern int gw_read_backend_event(DCB* dcb);
extern int gw_write_backend_event(DCB *dcb);

View File

@ -45,7 +45,10 @@ MODULE_INFO info = {
"A telnet deamon protocol for simple administration interface"
};
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
/**
* @file telnetd.c - telnet daemon protocol module

View File

@ -1,4 +1,4 @@
add_library(binlogrouter SHARED blr.c blr_master.c blr_cache.c blr_slave.c blr_file.c)
set_target_properties(binlogrouter PROPERTIES INSTALL_RPATH ${CMAKE_INSTALL_RPATH}:${CMAKE_INSTALL_PREFIX}/lib)
target_link_libraries(binlogrouter ssl pthread log_manager ${EMBEDDED_LIB})
target_link_libraries(binlogrouter ssl pthread log_manager)
install(TARGETS binlogrouter DESTINATION modules)

View File

@ -31,10 +31,14 @@ CFLAGS=-c -fPIC -I/usr/include -I../../include -I../../../include \
include ../../../../makefile.inc
#LDFLAGS=-shared -L$(LOGPATH) -L$(QCLASSPATH) -L$(EMBEDDED_LIB) \
# -Wl,-rpath,$(DEST)/lib \
# -Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH) -Wl,-rpath,$(QCLASSPATH) \
# -Wl,-rpath,$(EMBEDDED_LIB)
LDFLAGS=-shared -L$(LOGPATH) -L$(QCLASSPATH) -L$(EMBEDDED_LIB) \
-Wl,-rpath,$(DEST)/lib \
-Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH) -Wl,-rpath,$(QCLASSPATH) \
-Wl,-rpath,$(EMBEDDED_LIB)
-Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH)
SRCS=blr.c blr_master.c blr_cache.c blr_slave.c blr_file.c
OBJ=$(SRCS:.c=.o)

View File

@ -39,6 +39,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <time.h>
#include <service.h>
#include <server.h>
@ -48,6 +49,7 @@
#include <blr.h>
#include <dcb.h>
#include <spinlock.h>
#include <housekeeper.h>
#include <time.h>
#include <skygw_types.h>
@ -95,6 +97,8 @@ static ROUTER_OBJECT MyObject = {
getCapabilities
};
static void stats_func(void *);
static bool rses_begin_locked_router_action(ROUTER_SLAVE *);
static void rses_end_locked_router_action(ROUTER_SLAVE *);
@ -157,7 +161,7 @@ static ROUTER *
createInstance(SERVICE *service, char **options)
{
ROUTER_INSTANCE *inst;
char *value;
char *value, *name;
int i;
if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
@ -169,10 +173,19 @@ int i;
inst->service = service;
spinlock_init(&inst->lock);
inst->files = NULL;
spinlock_init(&inst->fileslock);
spinlock_init(&inst->binlog_lock);
inst->binlog_fd = -1;
inst->low_water = DEF_LOW_WATER;
inst->high_water = DEF_HIGH_WATER;
inst->initbinlog = 0;
inst->short_burst = DEF_SHORT_BURST;
inst->long_burst = DEF_LONG_BURST;
inst->burst_size = DEF_BURST_SIZE;
inst->retry_backoff = 1;
/*
* We only support one server behind this router, since the server is
@ -249,6 +262,10 @@ int i;
{
inst->initbinlog = atoi(value);
}
else if (strcmp(options[i], "file") == 0)
{
inst->initbinlog = atoi(value);
}
else if (strcmp(options[i], "lowwater") == 0)
{
inst->low_water = atoi(value);
@ -257,6 +274,38 @@ int i;
{
inst->high_water = atoi(value);
}
else if (strcmp(options[i], "shortburst") == 0)
{
inst->short_burst = atoi(value);
}
else if (strcmp(options[i], "longburst") == 0)
{
inst->long_burst = atoi(value);
}
else if (strcmp(options[i], "burstsize") == 0)
{
unsigned long size = atoi(value);
char *ptr = value;
while (*ptr && isdigit(*ptr))
ptr++;
switch (*ptr)
{
case 'G':
case 'g':
size = size * 1024 * 1000 * 1000;
break;
case 'M':
case 'm':
size = size * 1024 * 1000;
break;
case 'K':
case 'k':
size = size * 1024;
break;
}
inst->burst_size = size;
}
else
{
LOGIF(LE, (skygw_log_write(
@ -284,6 +333,7 @@ int i;
inst->active_logs = 0;
inst->reconnect_pending = 0;
inst->handling_threads = 0;
inst->rotating = 0;
inst->residual = NULL;
inst->slaves = NULL;
inst->next = NULL;
@ -302,6 +352,12 @@ int i;
*/
blr_init_cache(inst);
if ((name = (char *)malloc(80)) != NULL)
{
sprintf(name, "%s stats", service->name);
hktask_add(name, stats_func, inst, BLR_STATS_FREQ);
}
/*
* Now start the replication from the master to MaxScale
*/
@ -358,6 +414,8 @@ ROUTER_SLAVE *slave;
spinlock_init(&slave->catch_lock);
slave->dcb = session->client;
slave->router = inst;
slave->file = NULL;
strcpy(slave->binlogfile, "unassigned");
/**
* Add this session to the list of active sessions.
@ -477,6 +535,9 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
*/
slave->state = BLRS_UNREGISTERED;
if (slave->file)
blr_close_binlog(router, slave->file);
/* Unlock */
rses_end_locked_router_action(slave);
}
@ -541,7 +602,9 @@ diagnostics(ROUTER *router, DCB *dcb)
{
ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)router;
ROUTER_SLAVE *session;
int i = 0;
int i = 0, j;
int minno = 0;
double min5, min10, min15, min30;
char buf[40];
struct tm tm;
@ -554,6 +617,30 @@ struct tm tm;
}
spinlock_release(&router_inst->lock);
minno = router_inst->stats.minno;
min30 = 0.0;
min15 = 0.0;
min10 = 0.0;
min5 = 0.0;
for (j = 0; j < 30; j++)
{
minno--;
if (minno < 0)
minno += 30;
min30 += router_inst->stats.minavgs[minno];
if (j < 15)
min15 += router_inst->stats.minavgs[minno];
if (j < 10)
min10 += router_inst->stats.minavgs[minno];
if (j < 5)
min5 += router_inst->stats.minavgs[minno];
}
min30 /= 30.0;
min15 /= 15.0;
min10 /= 10.0;
min5 /= 5.0;
dcb_printf(dcb, "\tMaster connection DCB: %p\n",
router_inst->master);
dcb_printf(dcb, "\tMaster connection state: %s\n",
@ -574,6 +661,13 @@ struct tm tm;
router_inst->stats.n_slaves);
dcb_printf(dcb, "\tNumber of binlog events received: %u\n",
router_inst->stats.n_binlogs);
minno = router_inst->stats.minno - 1;
if (minno == -1)
minno = 30;
dcb_printf(dcb, "\tNumber of binlog events per minute\n");
dcb_printf(dcb, "\tCurrent 5 10 15 30 Min Avg\n");
dcb_printf(dcb, "\t %6d %8.1f %8.1f %8.1f %8.1f\n",
router_inst->stats.minavgs[minno], min5, min10, min15, min30);
dcb_printf(dcb, "\tNumber of fake binlog events: %u\n",
router_inst->stats.n_fakeevents);
dcb_printf(dcb, "\tNumber of artificial binlog events: %u\n",
@ -582,10 +676,6 @@ struct tm tm;
router_inst->stats.n_binlog_errors);
dcb_printf(dcb, "\tNumber of binlog rotate events: %u\n",
router_inst->stats.n_rotates);
dcb_printf(dcb, "\tNumber of binlog cache hits: %u\n",
router_inst->stats.n_cachehits);
dcb_printf(dcb, "\tNumber of binlog cache misses: %u\n",
router_inst->stats.n_cachemisses);
dcb_printf(dcb, "\tNumber of heartbeat events: %u\n",
router_inst->stats.n_heartbeats);
dcb_printf(dcb, "\tNumber of packets received: %u\n",
@ -615,6 +705,8 @@ struct tm tm;
spinlock_stats(&instlock, spin_reporter, dcb);
dcb_printf(dcb, "\tSpinlock statistics (instance lock):\n");
spinlock_stats(&router_inst->lock, spin_reporter, dcb);
dcb_printf(dcb, "\tSpinlock statistics (binlog position lock):\n");
spinlock_stats(&router_inst->binlog_lock, spin_reporter, dcb);
#endif
if (router_inst->slaves)
@ -624,6 +716,29 @@ struct tm tm;
session = router_inst->slaves;
while (session)
{
minno = session->stats.minno;
min30 = 0.0;
min15 = 0.0;
min10 = 0.0;
min5 = 0.0;
for (j = 0; j < 30; j++)
{
minno--;
if (minno < 0)
minno += 30;
min30 += session->stats.minavgs[minno];
if (j < 15)
min15 += session->stats.minavgs[minno];
if (j < 10)
min10 += session->stats.minavgs[minno];
if (j < 5)
min5 += session->stats.minavgs[minno];
}
min30 /= 30.0;
min15 /= 15.0;
min10 /= 10.0;
min5 /= 5.0;
dcb_printf(dcb, "\t\tServer-id: %d\n", session->serverid);
if (session->hostname)
dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname);
@ -637,14 +752,18 @@ struct tm tm;
dcb_printf(dcb, "\t\tNo. requests: %u\n", session->stats.n_requests);
dcb_printf(dcb, "\t\tNo. events sent: %u\n", session->stats.n_events);
dcb_printf(dcb, "\t\tNo. bursts sent: %u\n", session->stats.n_bursts);
minno = session->stats.minno - 1;
if (minno == -1)
minno = 30;
dcb_printf(dcb, "\t\tNumber of binlog events per minute\n");
dcb_printf(dcb, "\t\tCurrent 5 10 15 30 Min Avg\n");
dcb_printf(dcb, "\t\t %6d %8.1f %8.1f %8.1f %8.1f\n",
session->stats.minavgs[minno], min5, min10,
min15, min30);
dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows);
dcb_printf(dcb, "\t\tNo. catchup NRs: %u\n", session->stats.n_catchupnr);
dcb_printf(dcb, "\t\tNo. already up to date: %u\n", session->stats.n_alreadyupd);
dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd);
dcb_printf(dcb, "\t\tNo. of low water cbs %u\n", session->stats.n_cb);
dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb);
dcb_printf(dcb, "\t\tNo. of low water cbs N/A %u\n", session->stats.n_cbna);
dcb_printf(dcb, "\t\tNo. of events > high water %u\n", session->stats.n_above);
dcb_printf(dcb, "\t\tNo. of failed reads %u\n", session->stats.n_failed_read);
dcb_printf(dcb, "\t\tNo. of nested distribute events %u\n", session->stats.n_overrun);
dcb_printf(dcb, "\t\tNo. of distribute action 1 %u\n", session->stats.n_actions[0]);
@ -652,9 +771,11 @@ struct tm tm;
dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]);
if ((session->cstate & CS_UPTODATE) == 0)
{
dcb_printf(dcb, "\t\tSlave is in catchup mode. %s\n",
((session->cstate & CS_EXPECTCB) == 0 ? "" :
"Waiting for DCB queue to drain."));
dcb_printf(dcb, "\t\tSlave is in catchup mode. %s%s\n",
((session->cstate & CS_EXPECTCB) == 0 ? "" :
"Waiting for DCB queue to drain."),
((session->cstate & CS_BUSY) == 0 ? "" :
" Busy in slave catchup."));
}
else
@ -672,7 +793,7 @@ struct tm tm;
dcb_printf(dcb, "\tSpinlock statistics (rses_lock):\n");
spinlock_stats(&session->rses_lock, spin_reporter, dcb);
#endif
dcb_printf(dcb, "\n");
session = session->next;
}
spinlock_release(&router_inst->lock);
@ -718,9 +839,24 @@ ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
static void
errorReply(ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_dcb, error_action_t action, bool *succp)
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
int error, len;
char msg[85];
len = sizeof(error);
if (getsockopt(router->master->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0)
{
strerror_r(error, msg, 80);
strcat(msg, " ");
}
else
strcpy(msg, "");
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, "Erorr Reply '%s'", message)));
*succp = false;
LOGFILE_ERROR, "Master connection '%s', %sattempting reconnect to master",
message, msg)));
*succp = true;
blr_master_reconnect(router);
}
/** to be inline'd */
@ -776,3 +912,35 @@ static uint8_t getCapabilities(ROUTER *inst, void *router_session)
{
return 0;
}
/**
* The stats gathering function called from the housekeeper so that we
* can get timed averages of binlog records shippped
*
* @param inst The router instance
*/
static void
stats_func(void *inst)
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)inst;
ROUTER_SLAVE *slave;
router->stats.minavgs[router->stats.minno++]
= router->stats.n_binlogs - router->stats.lastsample;
router->stats.lastsample = router->stats.n_binlogs;
if (router->stats.minno == BLR_NSTATS_MINUTES)
router->stats.minno = 0;
spinlock_acquire(&router->lock);
slave = router->slaves;
while (slave)
{
slave->stats.minavgs[slave->stats.minno++]
= slave->stats.n_events - slave->stats.lastsample;
slave->stats.lastsample = slave->stats.n_events;
if (slave->stats.minno == BLR_NSTATS_MINUTES)
slave->stats.minno = 0;
slave = slave->next;
}
spinlock_release(&router->lock);
}

View File

@ -50,12 +50,13 @@
#include <skygw_utils.h>
#include <log_manager.h>
extern int lm_enabled_logfiles_bitmask;
static void blr_file_create(ROUTER_INSTANCE *router, char *file);
static void blr_file_append(ROUTER_INSTANCE *router, char *file);
static uint32_t extract_field(uint8_t *src, int bits);
static void blr_log_header(logfile_id_t file, char *msg, uint8_t *ptr);
/**
* Initialise the binlog file for this instance. MaxScale will look
@ -85,6 +86,8 @@ struct dirent *dp;
if (access(path, R_OK) == -1)
mkdir(path, 0777);
router->binlogdir = strdup(path);
/* First try to find a binlog file number by reading the directory */
root_len = strlen(router->fileroot);
dirp = opendir(path);
@ -146,17 +149,11 @@ blr_file_rotate(ROUTER_INSTANCE *router, char *file, uint64_t pos)
static void
blr_file_create(ROUTER_INSTANCE *router, char *file)
{
char *ptr, path[1024];
char path[1024];
int fd;
unsigned char magic[] = BINLOG_MAGIC;
strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
{
strcpy(path, ptr);
}
strcat(path, "/");
strcat(path, router->service->name);
strcpy(path, router->binlogdir);
strcat(path, "/");
strcat(path, file);
@ -167,12 +164,14 @@ unsigned char magic[] = BINLOG_MAGIC;
else
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to create binlog file %s\n", path)));
"Failed to create binlog file %s", path)));
}
fsync(fd);
close(router->binlog_fd);
spinlock_acquire(&router->binlog_lock);
strcpy(router->binlog_name, file);
router->binlog_position = 4; /* Initial position after the magic number */
spinlock_release(&router->binlog_lock);
router->binlog_fd = fd;
}
@ -186,30 +185,26 @@ unsigned char magic[] = BINLOG_MAGIC;
static void
blr_file_append(ROUTER_INSTANCE *router, char *file)
{
char *ptr, path[1024];
char path[1024];
int fd;
strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
{
strcpy(path, ptr);
}
strcat(path, "/");
strcat(path, router->service->name);
strcpy(path, router->binlogdir);
strcat(path, "/");
strcat(path, file);
if ((fd = open(path, O_RDWR|O_APPEND, 0666)) == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to open binlog file %s for append.\n",
"Failed to open binlog file %s for append.",
path)));
return;
}
fsync(fd);
close(router->binlog_fd);
spinlock_acquire(&router->binlog_lock);
strcpy(router->binlog_name, file);
router->binlog_position = lseek(fd, 0L, SEEK_END);
spinlock_release(&router->binlog_lock);
router->binlog_fd = fd;
}
@ -224,7 +219,10 @@ void
blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *buf)
{
pwrite(router->binlog_fd, buf, hdr->event_size, hdr->next_pos - hdr->event_size);
spinlock_acquire(&router->binlog_lock);
router->binlog_position = hdr->next_pos;
router->last_written = hdr->next_pos - hdr->event_size;
spinlock_release(&router->binlog_lock);
}
/**
@ -238,97 +236,272 @@ blr_file_flush(ROUTER_INSTANCE *router)
fsync(router->binlog_fd);
}
int
/**
* Open a binlog file for reading binlog records
*
* @param router The router instance
* @param binlog The binlog filename
* @return a binlog file record
*/
BLFILE *
blr_open_binlog(ROUTER_INSTANCE *router, char *binlog)
{
char *ptr, path[1024];
int rval;
BLFILE *file;
strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
spinlock_acquire(&router->fileslock);
file = router->files;
while (file && strcmp(file->binlogname, binlog) != 0)
file = file->next;
if (file)
{
strcpy(path, ptr);
file->refcnt++;
spinlock_release(&router->fileslock);
return file;
}
strcat(path, "/");
strcat(path, router->service->name);
if ((file = (BLFILE *)malloc(sizeof(BLFILE))) == NULL)
{
spinlock_release(&router->fileslock);
return NULL;
}
strcpy(file->binlogname, binlog);
file->refcnt = 1;
file->cache = 0;
spinlock_init(&file->lock);
strcpy(path, router->binlogdir);
strcat(path, "/");
strcat(path, binlog);
if ((rval = open(path, O_RDONLY, 0666)) == -1)
if ((file->fd = open(path, O_RDONLY, 0666)) == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to open binlog file %s\n", path)));
"Failed to open binlog file %s", path)));
free(file);
spinlock_release(&router->fileslock);
return NULL;
}
return rval;
file->next = router->files;
router->files = file;
spinlock_release(&router->fileslock);
return file;
}
/**
* Read a replication event into a GWBUF structure.
*
* @param fd File descriptor of the binlog file
* @param pos Position of binlog record to read
* @param hdr Binlog header to populate
* @return The binlog record wrapped in a GWBUF structure
* @param router The router instance
* @param file File record
* @param pos Position of binlog record to read
* @param hdr Binlog header to populate
* @return The binlog record wrapped in a GWBUF structure
*/
GWBUF *
blr_read_binlog(int fd, unsigned int pos, REP_HEADER *hdr)
blr_read_binlog(ROUTER_INSTANCE *router, BLFILE *file, unsigned int pos, REP_HEADER *hdr)
{
uint8_t hdbuf[19];
GWBUF *result;
unsigned char *data;
int n;
unsigned long filelen = 0;
struct stat statb;
if (lseek(fd, pos, SEEK_SET) != pos)
if (fstat(file->fd, &statb) == 0)
filelen = statb.st_size;
if (pos >= filelen)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to seek for binlog entry, "
"at %d.\n", pos)));
LOGIF(LD, (skygw_log_write(LOGFILE_ERROR,
"Attempting to read off the end of the binlog file %s, "
"event at %lu.", file->binlogname, pos)));
return NULL;
}
if (strcmp(router->binlog_name, file->binlogname) == 0 &&
pos >= router->binlog_position)
{
return NULL;
}
/* Read the header information from the file */
if ((n = read(fd, hdbuf, 19)) != 19)
if ((n = pread(file->fd, hdbuf, 19, pos)) != 19)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to read header for binlog entry, "
"at %d (%s).\n", pos, strerror(errno))));
if (n> 0 && n < 19)
switch (n)
{
case 0:
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
"Reached end of binlog file at %d.",
pos)));
break;
case -1:
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to read binlog file %s at position %d"
" (%s).", file->binlogname,
pos, strerror(errno))));
if (errno == EBADF)
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Bad file descriptor in read binlog for file %s"
", reference count is %d, descriptor %d.",
file->binlogname, file->refcnt, file->fd)));
break;
default:
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Short read when reading the header. "
"Expected 19 bytes got %d bytes.\n",
n)));
"Expected 19 bytes but got %d bytes. "
"Binlog file is %s, position %d",
file->binlogname, pos, n)));
break;
}
return NULL;
}
hdr->timestamp = extract_field(hdbuf, 32);
hdr->timestamp = EXTRACT32(hdbuf);
hdr->event_type = hdbuf[4];
hdr->serverid = extract_field(&hdbuf[5], 32);
hdr->serverid = EXTRACT32(&hdbuf[5]);
hdr->event_size = extract_field(&hdbuf[9], 32);
hdr->next_pos = extract_field(&hdbuf[13], 32);
hdr->flags = extract_field(&hdbuf[17], 16);
hdr->next_pos = EXTRACT32(&hdbuf[13]);
hdr->flags = EXTRACT16(&hdbuf[17]);
if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Next position in header appears to be incorrect "
"rereading event header at pos %ul in file %s, "
"file size is %ul. Master will write %ul in %s next.",
pos, file->binlogname, filelen, router->binlog_position,
router->binlog_name)));
if ((n = pread(file->fd, hdbuf, 19, pos)) != 19)
{
switch (n)
{
case 0:
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
"Reached end of binlog file at %d.",
pos)));
break;
case -1:
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to read binlog file %s at position %d"
" (%s).", file->binlogname,
pos, strerror(errno))));
if (errno == EBADF)
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Bad file descriptor in read binlog for file %s"
", reference count is %d, descriptor %d.",
file->binlogname, file->refcnt, file->fd)));
break;
default:
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Short read when reading the header. "
"Expected 19 bytes but got %d bytes. "
"Binlog file is %s, position %d",
file->binlogname, pos, n)));
break;
}
return NULL;
}
hdr->timestamp = EXTRACT32(hdbuf);
hdr->event_type = hdbuf[4];
hdr->serverid = EXTRACT32(&hdbuf[5]);
hdr->event_size = extract_field(&hdbuf[9], 32);
hdr->next_pos = EXTRACT32(&hdbuf[13]);
hdr->flags = EXTRACT16(&hdbuf[17]);
if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Next position still incorrect after "
"rereading")));
return NULL;
}
else
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Next position corrected by "
"rereading")));
}
}
if ((result = gwbuf_alloc(hdr->event_size)) == NULL)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to allocate memory for binlog entry, "
"size %d at %d.\n",
"size %d at %d.",
hdr->event_size, pos)));
return NULL;
}
data = GWBUF_DATA(result);
memcpy(data, hdbuf, 19); // Copy the header in
if ((n = read(fd, &data[19], hdr->event_size - 19))
if ((n = pread(file->fd, &data[19], hdr->event_size - 19, pos + 19))
!= hdr->event_size - 19) // Read the balance
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Short read when reading the event at %d. "
"Expected %d bytes got %d bytes.\n",
pos, n)));
if (n == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Error reading the event at %ld in %s. "
"%s, expected %d bytes.",
pos, file->binlogname,
strerror(errno), hdr->event_size - 19)));
}
else
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Short read when reading the event at %ld in %s. "
"Expected %d bytes got %d bytes.",
pos, file->binlogname, hdr->event_size - 19, n)));
if (filelen != 0 && filelen - pos < hdr->event_size)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Binlog event is close to the end of the binlog file, "
"current file size is %u.",
filelen)));
}
blr_log_header(LOGFILE_ERROR, "Possible malformed event header", hdbuf);
}
gwbuf_consume(result, hdr->event_size);
return NULL;
}
return result;
}
/**
* Close a binlog file that has been opened to read binlog records
*
* The open binlog files are shared between multiple slaves that are
* reading the same binlog file.
*
* @param router The router instance
* @param file The file to close
*/
void
blr_close_binlog(ROUTER_INSTANCE *router, BLFILE *file)
{
spinlock_acquire(&file->lock);
file->refcnt--;
if (file->refcnt == 0)
{
spinlock_acquire(&router->fileslock);
if (router->files == file)
router->files = file->next;
else
{
BLFILE *ptr = router->files;
while (ptr && ptr->next != file)
ptr = ptr->next;
if (ptr)
ptr->next = file->next;
}
spinlock_release(&router->fileslock);
close(file->fd);
file->fd = -1;
}
spinlock_release(&file->lock);
if (file->refcnt == 0)
free(file);
}
/**
* Extract a numeric field from a packet of the specified number of bits
*
@ -348,3 +521,40 @@ uint32_t rval = 0, shift = 0;
}
return rval;
}
/**
* Log the event header of binlog event
*
* @param file The log file into which to write the entry
* @param msg A message strign to preceed the header with
* @param ptr The event header raw data
*/
static void
blr_log_header(logfile_id_t file, char *msg, uint8_t *ptr)
{
char buf[400], *bufp;
int i;
bufp = buf;
bufp += sprintf(bufp, "%s: ", msg);
for (i = 0; i < 19; i++)
bufp += sprintf(bufp, "0x%02x ", ptr[i]);
skygw_log_write_flush(file, "%s", buf);
}
/**
* Return the size of the current binlog file
*
* @param file The binlog file
* @return The current size of the binlog file
*/
unsigned long
blr_file_size(BLFILE *file)
{
struct stat statb;
if (fstat(file->fd, &statb) == 0)
return statb.st_size;
return 0;
}

View File

@ -47,6 +47,7 @@
#include <blr.h>
#include <dcb.h>
#include <spinlock.h>
#include <housekeeper.h>
#include <sys/types.h>
#include <sys/socket.h>
@ -60,39 +61,6 @@
/* Temporary requirement for auth data */
#include <mysql_client_server_protocol.h>
#define SAMPLE_COUNT 10000
CYCLES samples[10][SAMPLE_COUNT];
int sample_index[10] = { 0, 0, 0 };
#define LOGD_SLAVE_CATCHUP1 0
#define LOGD_SLAVE_CATCHUP2 1
#define LOGD_DISTRIBUTE 2
#define LOGD_FILE_FLUSH 3
SPINLOCK logspin = SPINLOCK_INIT;
void
log_duration(int sample, CYCLES duration)
{
char fname[100];
int i;
FILE *fp;
spinlock_acquire(&logspin);
samples[sample][sample_index[sample]++] = duration;
if (sample_index[sample] == SAMPLE_COUNT)
{
sprintf(fname, "binlog_profile.%d", sample);
if ((fp = fopen(fname, "a")) != NULL)
{
for (i = 0; i < SAMPLE_COUNT; i++)
fprintf(fp, "%ld\n", samples[sample][i]);
fclose(fp);
}
sample_index[sample] = 0;
}
spinlock_release(&logspin);
}
extern int lm_enabled_logfiles_bitmask;
@ -126,7 +94,7 @@ GWBUF *buf;
if ((client = dcb_alloc(DCB_ROLE_INTERNAL)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Binlog router: failed to create DCB for dummy client\n")));
"Binlog router: failed to create DCB for dummy client")));
return;
}
router->client = client;
@ -134,14 +102,20 @@ GWBUF *buf;
if ((router->session = session_alloc(router->service, client)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Binlog router: failed to create session for connection to master\n")));
"Binlog router: failed to create session for connection to master")));
return;
}
client->session = router->session;
if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL)
{
char *name = malloc(strlen(router->service->name) + strlen(" Master") + 1);
sprintf(name, "%s Master", router->service->name);
hktask_oneshot(name, blr_start_master, router,
BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
if (router->retry_backoff > BLR_MAX_BACKOFF)
router->retry_backoff = 1;
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Binlog router: failed to connect to master server '%s'\n",
"Binlog router: failed to connect to master server '%s'",
router->service->databases->unique_name)));
return;
}
@ -155,6 +129,7 @@ perror("setsockopt");
router->master_state = BLRM_TIMESTAMP;
router->stats.n_masterstarts++;
router->retry_backoff = 1;
}
/**
@ -170,9 +145,7 @@ blr_restart_master(ROUTER_INSTANCE *router)
{
GWBUF *ptr;
dcb_close(router->master);
dcb_free(router->master);
dcb_free(router->client);
dcb_close(router->client);
/* Discard the queued residual data */
ptr = router->residual;
@ -252,7 +225,7 @@ char query[128];
if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.\n",
LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.",
router->master_state)));
gwbuf_consume(buf, gwbuf_length(buf));
spinlock_acquire(&router->lock);
@ -274,7 +247,7 @@ char query[128];
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Received error: %d, %s from master during %s phase of the master state machine.\n",
"Received error: %d, %s from master during %s phase of the master state machine.",
MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state]
)));
gwbuf_consume(buf, gwbuf_length(buf));
@ -548,23 +521,23 @@ static REP_HEADER phdr;
/* Get the length of the packet from the residual and new packet */
if (reslen >= 3)
{
len = extract_field(pdata, 24);
len = EXTRACT24(pdata);
}
else if (reslen == 2)
{
len = extract_field(pdata, 16);
len |= (extract_field(GWBUF_DATA(pkt->next), 8) << 16);
len = EXTRACT16(pdata);
len |= (*(uint8_t *)GWBUF_DATA(pkt->next) << 16);
}
else if (reslen == 1)
{
len = extract_field(pdata, 8);
len |= (extract_field(GWBUF_DATA(pkt->next), 16) << 8);
len = *pdata;
len |= (EXTRACT16(GWBUF_DATA(pkt->next)) << 8);
}
len += 4; // Allow space for the header
}
else
{
len = extract_field(pdata, 24) + 4;
len = EXTRACT24(pdata) + 4;
}
if (reslen < len && pkt_length >= len)
@ -585,7 +558,7 @@ static REP_HEADER phdr;
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"Insufficient memory to buffer event "
"of %d bytes. Binlog %s @ %d\n.",
"of %d bytes. Binlog %s @ %d.",
len, router->binlog_name,
router->binlog_position)));
break;
@ -610,7 +583,7 @@ static REP_HEADER phdr;
LOGFILE_ERROR,
"Expected entire message in buffer "
"chain, but failed to create complete "
"message as expected. %s @ %d\n",
"message as expected. %s @ %d",
router->binlog_name,
router->binlog_position)));
free(msg);
@ -631,7 +604,7 @@ static REP_HEADER phdr;
router->stats.n_residuals++;
LOGIF(LD,(skygw_log_write(
LOGFILE_DEBUG,
"Residual data left after %d records. %s @ %d\n",
"Residual data left after %d records. %s @ %d",
router->stats.n_binlogs,
router->binlog_name, router->binlog_position)));
break;
@ -683,7 +656,7 @@ static REP_HEADER phdr;
// #define SHOW_EVENTS
#ifdef SHOW_EVENTS
printf("blr: event type 0x%02x, flags 0x%04x, event size %d\n", hdr.event_type, hdr.flags, hdr.event_size);
printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size);
#endif
if (hdr.event_type >= 0 && hdr.event_type < 0x24)
router->stats.events[hdr.event_type]++;
@ -692,7 +665,7 @@ static REP_HEADER phdr;
// Fake format description message
LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG,
"Replication fake event. "
"Binlog %s @ %d.\n",
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->stats.n_fakeevents++;
@ -721,7 +694,7 @@ static REP_HEADER phdr;
LOGIF(LD,(skygw_log_write(
LOGFILE_DEBUG,
"Replication heartbeat. "
"Binlog %s @ %d.\n",
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->stats.n_heartbeats++;
@ -730,6 +703,8 @@ static REP_HEADER phdr;
{
ptr = ptr + 5; // We don't put the first byte of the payload
// into the binlog file
if (hdr.event_type == ROTATE_EVENT)
router->rotating = 1;
blr_write_binlog_record(router, &hdr, ptr);
if (hdr.event_type == ROTATE_EVENT)
{
@ -745,7 +720,7 @@ static REP_HEADER phdr;
"Artificial event not written "
"to disk or distributed. "
"Type 0x%x, Length %d, Binlog "
"%s @ %d\n.",
"%s @ %d.",
hdr.event_type,
hdr.event_size,
router->binlog_name,
@ -753,6 +728,7 @@ static REP_HEADER phdr;
ptr += 5;
if (hdr.event_type == ROTATE_EVENT)
{
router->rotating = 1;
blr_rotate_event(router, ptr, &hdr);
}
}
@ -762,7 +738,7 @@ static REP_HEADER phdr;
{
printf("Binlog router error: %s\n", &ptr[7]);
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"Error packet in binlog stream.%s @ %d\n.",
"Error packet in binlog stream.%s @ %d.",
router->binlog_name,
router->binlog_position)));
blr_log_packet(LOGFILE_ERROR, "Error Packet:",
@ -802,9 +778,7 @@ static REP_HEADER phdr;
{
ss_dassert(pkt_length == 0);
}
{ CYCLES start = rdtsc();
blr_file_flush(router);
log_duration(LOGD_FILE_FLUSH, rdtsc() - start); }
}
/**
@ -814,25 +788,25 @@ log_duration(LOGD_FILE_FLUSH, rdtsc() - start); }
* @param hdr The packet header to populate
*/
void
blr_extract_header(uint8_t *ptr, REP_HEADER *hdr)
blr_extract_header(register uint8_t *ptr, register REP_HEADER *hdr)
{
hdr->payload_len = extract_field(ptr, 24);
hdr->payload_len = EXTRACT24(ptr);
hdr->seqno = ptr[3];
hdr->ok = ptr[4];
hdr->timestamp = extract_field(&ptr[5], 32);
hdr->timestamp = EXTRACT32(&ptr[5]);
hdr->event_type = ptr[9];
hdr->serverid = extract_field(&ptr[10], 32);
hdr->event_size = extract_field(&ptr[14], 32);
hdr->next_pos = extract_field(&ptr[18], 32);
hdr->flags = extract_field(&ptr[22], 16);
hdr->serverid = EXTRACT32(&ptr[10]);
hdr->event_size = EXTRACT32(&ptr[14]);
hdr->next_pos = EXTRACT32(&ptr[18]);
hdr->flags = EXTRACT16(&ptr[22]);
}
/**
* Extract a numeric field from a packet of the specified number of bits
*
* @param src The raw packet source
* @param birs The number of bits to extract (multiple of 8)
* @param bits The number of bits to extract (multiple of 8)
*/
inline uint32_t
extract_field(register uint8_t *src, int bits)
@ -881,11 +855,13 @@ char file[BINLOG_FNAMELEN+1];
printf("New file: %s @ %ld\n", file, pos);
#endif
strcpy(router->prevbinlog, router->binlog_name);
if (strncmp(router->binlog_name, file, slen) != 0)
{
router->stats.n_rotates++;
blr_file_rotate(router, file, pos);
}
router->rotating = 0;
}
/**
@ -927,28 +903,35 @@ blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *
{
GWBUF *pkt;
uint8_t *buf;
ROUTER_SLAVE *slave;
ROUTER_SLAVE *slave, *nextslave;
int action;
CYCLES entry;
entry = rdtsc();
spinlock_acquire(&router->lock);
slave = router->slaves;
while (slave)
{
spinlock_acquire(&slave->catch_lock);
if ((slave->cstate & (CS_UPTODATE|CS_DIST)) == CS_UPTODATE)
if (slave->state != BLRS_DUMPING)
{
/* Slave is up to date with the binlog and no distribute is
* running on this slave.
slave = slave->next;
continue;
}
spinlock_acquire(&slave->catch_lock);
if ((slave->cstate & (CS_UPTODATE|CS_BUSY)) == CS_UPTODATE)
{
/*
* This slave is reporting it is to date with the binlog of the
* master running on this slave.
* It has no thread running currently that is sending binlog
* events.
*/
action = 1;
slave->cstate |= CS_DIST;
slave->cstate |= CS_BUSY;
}
else if ((slave->cstate & (CS_UPTODATE|CS_DIST)) == (CS_UPTODATE|CS_DIST))
else if ((slave->cstate & (CS_UPTODATE|CS_BUSY)) == (CS_UPTODATE|CS_BUSY))
{
/* Slave is up to date with the binlog and a distribute is
* running on this slave.
/*
* The slave is up to date with the binlog and a process is
* running on this slave to send binlog events.
*/
slave->overrun = 1;
action = 2;
@ -960,12 +943,20 @@ CYCLES entry;
}
slave->stats.n_actions[action-1]++;
spinlock_release(&slave->catch_lock);
if (action == 1)
{
if ((slave->binlog_pos == hdr->next_pos - hdr->event_size)
&& (strcmp(slave->binlogfile, router->binlog_name) == 0 ||
hdr->event_type == ROTATE_EVENT))
if (slave->binlog_pos == router->last_written &&
(strcmp(slave->binlogfile, router->binlog_name) == 0 ||
(hdr->event_type == ROTATE_EVENT &&
strcmp(slave->binlogfile, router->prevbinlog))))
{
/*
* The slave should be up to date, check that the binlog
* position matches the event we have to distribute or
* this is a rotate event. Send the event directly from
* memory to the slave.
*/
pkt = gwbuf_alloc(hdr->event_size + 5);
buf = GWBUF_DATA(pkt);
encode_value(buf, hdr->event_size + 1, 24);
@ -985,77 +976,90 @@ CYCLES entry;
spinlock_acquire(&slave->catch_lock);
if (slave->overrun)
{
CYCLES cycle_start, cycles;
slave->stats.n_overrun++;
slave->overrun = 0;
spinlock_release(&router->lock);
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
spinlock_release(&slave->catch_lock);
cycle_start = rdtsc();
blr_slave_catchup(router, slave);
cycles = rdtsc() - cycle_start;
log_duration(LOGD_SLAVE_CATCHUP2, cycles);
spinlock_acquire(&router->lock);
slave = router->slaves;
if (slave)
continue;
else
break;
poll_fake_write_event(slave->dcb);
}
else
{
slave->cstate &= ~CS_DIST;
slave->cstate &= ~CS_BUSY;
}
spinlock_release(&slave->catch_lock);
}
else if (slave->binlog_pos == hdr->next_pos
&& strcmp(slave->binlogfile, router->binlog_name) == 0)
{
/*
* Slave has already read record from file, no
* need to distrbute this event
*/
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_BUSY;
spinlock_release(&slave->catch_lock);
}
else if ((slave->binlog_pos > hdr->next_pos - hdr->event_size)
&& strcmp(slave->binlogfile, router->binlog_name) == 0)
{
/*
* The slave is ahead of the master, this should never
* happen. Force the slave to catchup mode in order to
* try to resolve the issue.
*/
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Slave %d is ahead of expected position %s@%d. "
"Expected position %d",
slave->serverid, slave->binlogfile,
slave->binlog_pos,
hdr->next_pos - hdr->event_size)));
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~(CS_UPTODATE|CS_BUSY);
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
}
else if ((hdr->event_type != ROTATE_EVENT)
&& (slave->binlog_pos != hdr->next_pos - hdr->event_size ||
strcmp(slave->binlogfile, router->binlog_name) != 0))
else
{
/* Check slave is in catchup mode and if not
* force it to go into catchup mode.
/*
* The slave is not at the position it should be. Force it into
* catchup mode rather than send this event.
*/
if (slave->cstate & CS_UPTODATE)
{
CYCLES cycle_start, cycles;
spinlock_release(&router->lock);
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
"Force slave %d into catchup mode %s@%d\n",
slave->serverid, slave->binlogfile,
slave->binlog_pos)));
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
spinlock_release(&slave->catch_lock);
cycle_start = rdtsc();
blr_slave_catchup(router, slave);
cycles = rdtsc() - cycle_start;
log_duration(LOGD_SLAVE_CATCHUP1, cycles);
spinlock_acquire(&router->lock);
slave = router->slaves;
if (slave)
continue;
else
break;
}
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~(CS_UPTODATE|CS_BUSY);
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
}
}
else if (action == 3)
{
/* Slave is not up to date
* Check if it is either expecting a callback or
* is busy processing a callback
*/
spinlock_acquire(&slave->catch_lock);
if ((slave->cstate & (CS_EXPECTCB|CS_BUSY)) == 0)
{
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
}
else
spinlock_release(&slave->catch_lock);
}
slave = slave->next;
}
spinlock_release(&router->lock);
log_duration(LOGD_DISTRIBUTE, rdtsc() - entry);
}
/**
* Write a raw event (the first 40 bytes at most) to a log file
*
* @param file The logfile to write to
* @param msg A textual message to write before the packet
* @param ptr Pointer to the message buffer
* @param len Length of message packet
*/
static void
blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len)
{
@ -1067,8 +1071,21 @@ int i;
for (i = 0; i < len && i < 40; i++)
bufp += sprintf(bufp, "0x%02x ", ptr[i]);
if (i < len)
skygw_log_write_flush(file, "%s...\n", buf);
skygw_log_write_flush(file, "%s...", buf);
else
skygw_log_write_flush(file, "%s\n", buf);
skygw_log_write_flush(file, "%s", buf);
}
/**
* Check if the master connection is in place and we
* are downlaoding binlogs
*
* @param router The router instance
* @return non-zero if we are recivign binlog records
*/
int
blr_master_connected(ROUTER_INSTANCE *router)
{
return router->master_state == BLRM_BINLOGDUMP;
}

View File

@ -47,12 +47,12 @@
#include <blr.h>
#include <dcb.h>
#include <spinlock.h>
#include <housekeeper.h>
#include <skygw_types.h>
#include <skygw_utils.h>
#include <log_manager.h>
static uint32_t extract_field(uint8_t *src, int bits);
static void encode_value(unsigned char *data, unsigned int value, int len);
static int blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue);
@ -61,9 +61,10 @@ static void blr_slave_send_error(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, c
static int blr_slave_send_timestamp(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static int blr_slave_register(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue);
static int blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue);
int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
static int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
extern int lm_enabled_logfiles_bitmask;
@ -89,7 +90,7 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
if (slave->state < 0 || slave->state > BLRS_MAXSTATE)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Invalid slave state machine state (%d) for binlog router.\n",
LOGFILE_ERROR, "Invalid slave state machine state (%d) for binlog router.",
slave->state)));
gwbuf_consume(queue, gwbuf_length(queue));
return 0;
@ -109,13 +110,13 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
break;
case COM_QUIT:
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
"COM_QUIT received from slave with server_id %d\n",
"COM_QUIT received from slave with server_id %d",
slave->serverid)));
break;
default:
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Unexpected MySQL Command (%d) received from slave\n",
"Unexpected MySQL Command (%d) received from slave",
MYSQL_COMMAND(queue))));
break;
}
@ -166,7 +167,7 @@ int query_len;
query_text = strndup(qtext, query_len);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE, "Execute statement from the slave '%s'\n", query_text)));
LOGFILE_TRACE, "Execute statement from the slave '%s'", query_text)));
/*
* Implement a very rudimental "parsing" of the query text by extarcting the
* words from the statement and matchng them against the subset of queries we
@ -269,7 +270,7 @@ int query_len;
query_text = strndup(qtext, query_len);
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Unexpected query from slave server %s\n", query_text)));
LOGFILE_ERROR, "Unexpected query from slave server %s", query_text)));
free(query_text);
blr_slave_send_error(router, slave, "Unexpected SQL query received from slave.");
return 0;
@ -300,7 +301,7 @@ GWBUF *clone;
else
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to clone server response to send to slave.\n")));
"Failed to clone server response to send to slave.")));
return 0;
}
}
@ -474,18 +475,19 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue
{
GWBUF *resp;
uint8_t *ptr;
int len, flags, serverid, rval;
int len, flags, serverid, rval, binlognamelen;
REP_HEADER hdr;
uint32_t chksum;
ptr = GWBUF_DATA(queue);
len = extract_field(ptr, 24);
binlognamelen = len - 11;
ptr += 4; // Skip length and sequence number
if (*ptr++ != COM_BINLOG_DUMP)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_binlog_dump expected a COM_BINLOG_DUMP but received %d\n",
"blr_slave_binlog_dump expected a COM_BINLOG_DUMP but received %d",
*(ptr-1))));
return 0;
}
@ -496,15 +498,16 @@ uint32_t chksum;
ptr += 2;
serverid = extract_field(ptr, 32);
ptr += 4;
strncpy(slave->binlogfile, (char *)ptr, BINLOG_FNAMELEN);
strncpy(slave->binlogfile, (char *)ptr, binlognamelen);
slave->binlogfile[binlognamelen] = 0;
slave->state = BLRS_DUMPING;
slave->seqno = 1;
if (slave->nocrc)
len = 0x2b;
len = 19 + 8 + binlognamelen;
else
len = 0x2f;
len = 19 + 8 + 4 + binlognamelen;
// Build a fake rotate event
resp = gwbuf_alloc(len + 5);
@ -520,8 +523,8 @@ uint32_t chksum;
ptr = blr_build_header(resp, &hdr);
encode_value(ptr, slave->binlog_pos, 64);
ptr += 8;
memcpy(ptr, slave->binlogfile, BINLOG_FNAMELEN);
ptr += BINLOG_FNAMELEN;
memcpy(ptr, slave->binlogfile, binlognamelen);
ptr += binlognamelen;
if (!slave->nocrc)
{
@ -567,18 +570,24 @@ uint32_t chksum;
slave->dcb->low_water = router->low_water;
slave->dcb->high_water = router->high_water;
dcb_add_callback(slave->dcb, DCB_REASON_LOW_WATER, blr_slave_callback, slave);
dcb_add_callback(slave->dcb, DCB_REASON_DRAINED, blr_slave_callback, slave);
slave->state = BLRS_DUMPING;
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"%s: New slave %s requested binlog file %s from position %lu",
router->service->name, slave->dcb->remote,
slave->binlogfile, slave->binlog_pos)));
if (slave->binlog_pos != router->binlog_position ||
strcmp(slave->binlogfile, router->binlog_name) != 0)
{
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_UPTODATE;
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
rval = blr_slave_catchup(router, slave);
poll_fake_write_event(slave->dcb);
}
return rval;
}
@ -630,7 +639,7 @@ encode_value(unsigned char *data, unsigned int value, int len)
* @param hdr The packet header to populate
* @return A pointer to the first byte following the event header
*/
static uint8_t *
uint8_t *
blr_build_header(GWBUF *pkt, REP_HEADER *hdr)
{
uint8_t *ptr;
@ -660,229 +669,225 @@ uint8_t *ptr;
* We have a registered slave that is behind the current leading edge of the
* binlog. We must replay the log entries to bring this node up to speed.
*
* There may be a large numebr of records to send to the slave, the process
* There may be a large number of records to send to the slave, the process
* is triggered by the slave COM_BINLOG_DUMP message and all the events must
* be sent without receiving any new event. This measn there is no trigger into
* MaxScale other than this initial message. However, if we simply send all the
* events we end up with an extremely long write queue on the DCB and risk running
* the server out of resources.
* events we end up with an extremely long write queue on the DCB and risk
* running the server out of resources.
*
* To resolve this the concept of high and low water marks within the DCB has been
* added, with the ability for the DCB code to call user defined callbacks when the
* write queue is completely drained, when it crosses above the high water mark and
* when it crosses below the low water mark.
*
* The blr_slave_catchup routine will send binlog events to the slave until the high
* water mark is reached, at which point it will return. Later, when a low water mark
* callback is generated by the code that drains the DCB of data the blr_slave_catchup
* routine will again be called to write more events. The process is repeated until
* the slave has caught up with the master.
* The slave catchup routine will send a burst of replication events per single
* call. The paramter "long" control the number of events in the burst. The
* short burst is intended to be used when the master receive an event and
* needs to put the slave into catchup mode. This prevents the slave taking
* too much tiem away from the thread that is processing the master events.
*
* Note: an additional check that the DCB is still above the low water mark is done
* prior to the return from this function to allow for any delays due to the call to
* the close system call, since this may cause thread rescheduling.
* At the end of the burst a fake EPOLLOUT event is added to the poll event
* queue. This ensures that the slave callback for processing DCB write drain
* will be called and future catchup requests will be handled on another thread.
*
* @param router The binlog router
* @param slave The slave that is behind
* @param large Send a long or short burst of events
* @return The number of bytes written
*/
int
blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
{
GWBUF *head, *record;
REP_HEADER hdr;
int written, fd, rval = 1, burst = 0;
int written, rval = 1, burst;
int rotating;
unsigned long burst_size;
uint8_t *ptr;
struct timespec req;
if (large)
burst = router->long_burst;
else
burst = router->short_burst;
burst_size = router->burst_size;
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
doitagain:
/*
* We have a slightly complex syncronisation mechansim here,
* we need to make sure that we do not have multiple threads
* running the catchup loop, but we need to be very careful
* that we do not loose a call that is coming via a callback
* call as this will stall the binlog catchup process.
*
* We don't want to simply use a traditional mutex here for
* the loop, since this would block a MaxScale thread for
* an unacceptable length of time.
*
* We have two status bits, the CS_READING that says we are
* in the outer loop and the CS_INNERLOOP, to say we are in
* the inner loop.
*
* If just CS_READING is set the other thread may be about to
* enter the inner loop or may be about to exit the function
* completely. Therefore we have to wait to see if CS_READING
* is cleared or CS_INNERLOOP is set.
*
* If CS_READING gets cleared then this thread should proceed
* into the loop.
*
* If CS_INNERLOOP get's set then this thread does not need to
* proceed.
*
* If CS_READING is not set then this thread simply enters the
* loop.
*/
req.tv_sec = 0;
req.tv_nsec = 1000;
spinlock_acquire(&slave->catch_lock);
if (slave->cstate & CS_UPTODATE)
if (slave->cstate & CS_BUSY)
{
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"blr_slave_catchup called with up to date slave %d at "
"%s@%d. Reading position %s@%d\n",
slave->serverid, slave->binlogfile,
slave->binlog_pos, router->binlog_name,
router->binlog_position)));
slave->stats.n_alreadyupd++;
spinlock_release(&slave->catch_lock);
return 1;
return 0;
}
while (slave->cstate & CS_READING)
{
// Wait until we know what the other thread is doing
while ((slave->cstate & (CS_READING|CS_INNERLOOP)) == CS_READING)
{
spinlock_release(&slave->catch_lock);
nanosleep(&req, NULL);
spinlock_acquire(&slave->catch_lock);
}
// Other thread is in the innerloop
if ((slave->cstate & (CS_READING|CS_INNERLOOP)) == (CS_READING|CS_INNERLOOP))
{
spinlock_release(&slave->catch_lock);
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"blr_slave_catchup thread returning due to "
"lock being held by another thread. %s@%d\n",
slave->binlogfile,
slave->binlog_pos)));
slave->stats.n_catchupnr++;
return 1; // We cheat here and return 1 because otherwise
// an error would be sent and we do not want that
}
/* Release the lock for a short time to allow the other
* thread to exit the outer reading loop.
*/
spinlock_release(&slave->catch_lock);
nanosleep(&req, NULL);
spinlock_acquire(&slave->catch_lock);
}
if (slave->pthread)
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG, "Multiple threads sending to same thread.\n")));
slave->pthread = pthread_self();
slave->cstate |= CS_READING;
slave->cstate |= CS_BUSY;
spinlock_release(&slave->catch_lock);
if (DCB_ABOVE_HIGH_WATER(slave->dcb))
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "blr_slave_catchup above high water on entry.\n")));
do {
if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1)
if (slave->file == NULL)
{
rotating = router->rotating;
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
{
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_READING;
spinlock_release(&slave->catch_lock);
LOGIF(LE, (skygw_log_write(
if (rotating)
{
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
slave->cstate &= ~CS_BUSY;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
return rval;
}
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s\n",
"blr_slave_catchup failed to open binlog file %s",
slave->binlogfile)));
slave->cstate &= ~CS_BUSY;
slave->state = BLRS_ERRORED;
dcb_close(slave->dcb);
return 0;
}
slave->stats.n_bursts++;
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_INNERLOOP;
spinlock_release(&slave->catch_lock);
while ((!DCB_ABOVE_HIGH_WATER(slave->dcb)) &&
(record = blr_read_binlog(fd, slave->binlog_pos, &hdr)) != NULL)
}
slave->stats.n_bursts++;
while (burst-- && burst_size > 0 &&
(record = blr_read_binlog(router, slave->file, slave->binlog_pos, &hdr)) != NULL)
{
head = gwbuf_alloc(5);
ptr = GWBUF_DATA(head);
encode_value(ptr, hdr.event_size + 1, 24);
ptr += 3;
*ptr++ = slave->seqno++;
*ptr++ = 0; // OK
head = gwbuf_append(head, record);
if (hdr.event_type == ROTATE_EVENT)
{
if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++;
head = gwbuf_alloc(5);
ptr = GWBUF_DATA(head);
encode_value(ptr, hdr.event_size + 1, 24);
ptr += 3;
*ptr++ = slave->seqno++;
*ptr++ = 0; // OK
head = gwbuf_append(head, record);
if (hdr.event_type == ROTATE_EVENT)
unsigned long beat1 = hkheartbeat;
blr_close_binlog(router, slave->file);
if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "blr_close_binlog took %d beats",
hkheartbeat - beat1)));
blr_slave_rotate(slave, GWBUF_DATA(record));
beat1 = hkheartbeat;
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
{
close(fd);
blr_slave_rotate(slave, GWBUF_DATA(record));
if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1)
if (rotating)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s\n",
slave->binlogfile)));
break;
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
slave->cstate &= ~CS_BUSY;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
return rval;
}
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s",
slave->binlogfile)));
slave->state = BLRS_ERRORED;
dcb_close(slave->dcb);
break;
}
written = slave->dcb->func.write(slave->dcb, head);
if (written && hdr.event_type != ROTATE_EVENT)
{
slave->binlog_pos = hdr.next_pos;
}
rval = written;
slave->stats.n_events++;
burst++;
if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "blr_open_binlog took %d beats",
hkheartbeat - beat1)));
}
if (record == NULL)
slave->stats.n_failed_read++;
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_INNERLOOP;
spinlock_release(&slave->catch_lock);
written = slave->dcb->func.write(slave->dcb, head);
if (written && hdr.event_type != ROTATE_EVENT)
{
slave->binlog_pos = hdr.next_pos;
}
rval = written;
slave->stats.n_events++;
burst_size -= hdr.event_size;
}
if (record == NULL)
slave->stats.n_failed_read++;
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_BUSY;
spinlock_release(&slave->catch_lock);
close(fd);
} while (record && DCB_BELOW_LOW_WATER(slave->dcb));
if (record)
{
slave->stats.n_flows++;
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
}
else if (slave->binlog_pos == router->binlog_position &&
strcmp(slave->binlogfile, router->binlog_name) == 0)
{
int state_change = 0;
spinlock_acquire(&router->binlog_lock);
spinlock_acquire(&slave->catch_lock);
/*
* Now check again since we hold the router->binlog_lock
* and slave->catch_lock.
*/
if (slave->binlog_pos != router->binlog_position ||
strcmp(slave->binlogfile, router->binlog_name) != 0)
{
slave->cstate &= ~CS_UPTODATE;
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
spinlock_release(&router->binlog_lock);
poll_fake_write_event(slave->dcb);
}
else
{
if ((slave->cstate & CS_UPTODATE) == 0)
{
slave->stats.n_upd++;
slave->cstate |= CS_UPTODATE;
spinlock_release(&slave->catch_lock);
spinlock_release(&router->binlog_lock);
state_change = 1;
}
}
if (state_change)
{
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"%s: Slave %s is up to date %s, %u.",
router->service->name,
slave->dcb->remote,
slave->binlogfile, slave->binlog_pos)));
}
}
else
{
int state_change = 0;
spinlock_acquire(&slave->catch_lock);
if ((slave->cstate & CS_UPTODATE) == 0)
if (slave->binlog_pos >= blr_file_size(slave->file)
&& router->rotating == 0
&& strcmp(router->binlog_name, slave->binlogfile) != 0
&& blr_master_connected(router))
{
slave->stats.n_upd++;
slave->cstate |= CS_UPTODATE;
state_change = 1;
/* We may have reached the end of file of a non-current
* binlog file.
*
* Note if the master is rotating there is a window during
* which the rotate event has been written to the old binlog
* but the new binlog file has not yet been created. Therefore
* we ignore these issues during the rotate processing.
*/
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Slave reached end of file for binlong file %s at %u "
"which is not the file currently being downloaded. "
"Master binlog is %s, %lu.",
slave->binlogfile, slave->binlog_pos,
router->binlog_name, router->binlog_position)));
if (blr_slave_fake_rotate(router, slave))
{
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
}
else
{
slave->state = BLRS_ERRORED;
dcb_close(slave->dcb);
}
}
else
{
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
}
spinlock_release(&slave->catch_lock);
if (state_change)
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"blr_slave_catchup slave is up to date %s, %u\n",
slave->binlogfile, slave->binlog_pos)));
}
spinlock_acquire(&slave->catch_lock);
#if 0
if (slave->pthread != pthread_self())
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Multple threads in catchup for same slave: %x and %x\n", slave->pthread, pthread_self())));
abort();
}
#endif
slave->pthread = 0;
#if 0
if (DCB_BELOW_LOW_WATER(slave->dcb) && slave->binlog_pos != router->binlog_position) abort();
#endif
slave->cstate &= ~CS_READING;
spinlock_release(&slave->catch_lock);
if (DCB_BELOW_LOW_WATER(slave->dcb) && slave->binlog_pos != router->binlog_position)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Expected to be above low water\n")));
goto doitagain;
}
return rval;
}
@ -896,7 +901,7 @@ goto doitagain;
* @param reason The reason the callback was called
* @param data The user data, in this case the server structure
*/
static int
int
blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data)
{
ROUTER_SLAVE *slave = (ROUTER_SLAVE *)data;
@ -904,11 +909,19 @@ ROUTER_INSTANCE *router = slave->router;
if (reason == DCB_REASON_DRAINED)
{
if (slave->state == BLRS_DUMPING &&
slave->binlog_pos != router->binlog_position)
if (slave->state == BLRS_DUMPING)
{
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~(CS_UPTODATE|CS_EXPECTCB);
spinlock_release(&slave->catch_lock);
slave->stats.n_dcb++;
blr_slave_catchup(router, slave);
blr_slave_catchup(router, slave, true);
}
else
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG, "Ignored callback due to slave state %s",
blrs_states[slave->state])));
}
}
@ -917,7 +930,7 @@ ROUTER_INSTANCE *router = slave->router;
if (slave->state == BLRS_DUMPING)
{
slave->stats.n_cb++;
blr_slave_catchup(router, slave);
blr_slave_catchup(router, slave, true);
}
else
{
@ -931,14 +944,90 @@ ROUTER_INSTANCE *router = slave->router;
* Rotate the slave to the new binlog file
*
* @param slave The slave instance
* @param ptr The rotate event (minux header and OK byte)
* @param ptr The rotate event (minus header and OK byte)
*/
void
blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr)
{
int len = EXTRACT24(ptr + 9); // Extract the event length
len = len - (19 + 8 + 4); // Remove length of header, checksum and position
if (len > BINLOG_FNAMELEN)
len = BINLOG_FNAMELEN;
ptr += 19; // Skip header
slave->binlog_pos = extract_field(ptr, 32);
slave->binlog_pos += (extract_field(ptr+4, 32) << 32);
memcpy(slave->binlogfile, ptr + 8, BINLOG_FNAMELEN);
slave->binlogfile[BINLOG_FNAMELEN] = 0;
memcpy(slave->binlogfile, ptr + 8, len);
slave->binlogfile[len] = 0;
}
/**
* Generate an internal rotate event that we can use to cause the slave to move beyond
* a binlog file that is misisng the rotate eent at the end.
*
* @param router The router instance
* @param slave The slave to rotate
* @return Non-zero if the rotate took place
*/
static int
blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
{
char *sptr;
int filenum;
GWBUF *resp;
uint8_t *ptr;
int len, binlognamelen;
REP_HEADER hdr;
uint32_t chksum;
if ((sptr = strrchr(slave->binlogfile, '.')) == NULL)
return 0;
blr_close_binlog(router, slave->file);
filenum = atoi(sptr + 1);
sprintf(slave->binlogfile, BINLOG_NAMEFMT, router->fileroot, filenum + 1);
slave->binlog_pos = 4;
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
return 0;
binlognamelen = strlen(slave->binlogfile);
if (slave->nocrc)
len = 19 + 8 + binlognamelen;
else
len = 19 + 8 + 4 + binlognamelen;
// Build a fake rotate event
resp = gwbuf_alloc(len + 5);
hdr.payload_len = len + 1;
hdr.seqno = slave->seqno++;
hdr.ok = 0;
hdr.timestamp = 0L;
hdr.event_type = ROTATE_EVENT;
hdr.serverid = router->masterid;
hdr.event_size = len;
hdr.next_pos = 0;
hdr.flags = 0x20;
ptr = blr_build_header(resp, &hdr);
encode_value(ptr, slave->binlog_pos, 64);
ptr += 8;
memcpy(ptr, slave->binlogfile, binlognamelen);
ptr += binlognamelen;
if (!slave->nocrc)
{
/*
* Now add the CRC to the fake binlog rotate event.
*
* The algorithm is first to compute the checksum of an empty buffer
* and then the checksum of the event portion of the message, ie we do not
* include the length, sequence number and ok byte that makes up the first
* 5 bytes of the message. We also do not include the 4 byte checksum itself.
*/
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, GWBUF_DATA(resp) + 5, hdr.event_size - 4);
encode_value(ptr, chksum, 32);
}
slave->dcb->func.write(slave->dcb, resp);
return 1;
}

View File

@ -53,7 +53,10 @@ MODULE_INFO info = {
"The admin user interface"
};
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
static char *version_str = "V1.0.0";

View File

@ -52,7 +52,10 @@ MODULE_INFO info = {
"The debug user interface"
};
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
static char *version_str = "V1.1.1";

View File

@ -41,6 +41,7 @@
* than simply addresses
* 23/05/14 Mark Riddoch Added support for developer and user modes
* 29/05/14 Mark Riddoch Add Filter support
* 16/10/14 Mark Riddoch Add show eventq
*
* @endverbatim
*/
@ -66,6 +67,8 @@
#include <adminusers.h>
#include <monitor.h>
#include <debugcli.h>
#include <poll.h>
#include <housekeeper.h>
#include <skygw_utils.h>
#include <log_manager.h>
@ -81,6 +84,7 @@
#define ARG_TYPE_DCB 7
#define ARG_TYPE_MONITOR 8
#define ARG_TYPE_FILTER 9
#define ARG_TYPE_NUMERIC 10
/**
* The subcommand structure
@ -117,6 +121,14 @@ struct subcommand showoptions[] = {
"Show the poll statistics",
"Show the poll statistics",
{0, 0, 0} },
{ "eventq", 0, dShowEventQ,
"Show the queue of events waiting to be processed",
"Show the queue of events waiting to be processed",
{0, 0, 0} },
{ "eventstats", 0, dShowEventStats,
"Show the event statistics",
"Show the event statistics",
{0, 0, 0} },
{ "filter", 1, dprintFilter,
"Show details of a filter, called with a filter name",
"Show details of a filter, called with the address of a filter",
@ -161,6 +173,10 @@ struct subcommand showoptions[] = {
"Show all active sessions in MaxScale",
"Show all active sessions in MaxScale",
{0, 0, 0} },
{ "tasks", 0, hkshow_tasks,
"Show all active housekeeper tasks in MaxScale",
"Show all active housekeeper tasks in MaxScale",
{0, 0, 0} },
{ "threads", 0, dShowThreads,
"Show the status of the polling threads in MaxScale",
"Show the status of the polling threads in MaxScale",
@ -282,6 +298,8 @@ struct subcommand restartoptions[] = {
};
static void set_server(DCB *dcb, SERVER *server, char *bit);
static void set_pollsleep(DCB *dcb, int);
static void set_nbpoll(DCB *dcb, int);
/**
* The subcommands of the set command
*/
@ -290,6 +308,15 @@ struct subcommand setoptions[] = {
"Set the status of a server. E.g. set server dbnode4 master",
"Set the status of a server. E.g. set server 0x4838320 master",
{ARG_TYPE_SERVER, ARG_TYPE_STRING, 0} },
{ "pollsleep", 1, set_pollsleep,
"Set the maximum poll sleep period in milliseconds",
"Set the maximum poll sleep period in milliseconds",
{ARG_TYPE_NUMERIC, 0, 0} },
{ "nbpolls", 1, set_nbpoll,
"Set the number of non-blocking polls",
"Set the number of non-blocking polls",
{ARG_TYPE_NUMERIC, 0, 0} },
{ NULL, 0, NULL, NULL, NULL,
{0, 0, 0} }
};
@ -328,6 +355,8 @@ struct subcommand reloadoptions[] = {
static void enable_log_action(DCB *, char *);
static void disable_log_action(DCB *, char *);
static void enable_sess_log_action(DCB *dcb, char *arg1, char *arg2);
static void disable_sess_log_action(DCB *dcb, char *arg1, char *arg2);
static void enable_monitor_replication_heartbeat(DCB *dcb, MONITOR *monitor);
static void disable_monitor_replication_heartbeat(DCB *dcb, MONITOR *monitor);
static void enable_service_root(DCB *dcb, SERVICE *service);
@ -354,6 +383,16 @@ struct subcommand enableoptions[] = {
"Enable Log options for MaxScale, options trace | error | "
"message E.g. enable log message.",
{ARG_TYPE_STRING, 0, 0}
},
{
"sessionlog",
2,
enable_sess_log_action,
"Enable Log options for a single session. Usage: enable sessionlog [trace | error | "
"message | debug] <session id>\t E.g. enable sessionlog message 123.",
"Enable Log options for a single session. Usage: enable sessionlog [trace | error | "
"message | debug] <session id>\t E.g. enable sessionlog message 123.",
{ARG_TYPE_STRING, ARG_TYPE_STRING, 0}
},
{
"root",
@ -374,6 +413,7 @@ struct subcommand enableoptions[] = {
};
/**
* * The subcommands of the disable command
* */
@ -396,6 +436,16 @@ struct subcommand disableoptions[] = {
"E.g. disable log debug",
{ARG_TYPE_STRING, 0, 0}
},
{
"sessionlog",
2,
disable_sess_log_action,
"Disable Log options for a single session. Usage: disable sessionlog [trace | error | "
"message | debug] <session id>\t E.g. disable sessionlog message 123.",
"Disable Log options for a single session. Usage: disable sessionlog [trace | error | "
"message | debug] <session id>\t E.g. disable sessionlog message 123.",
{ARG_TYPE_STRING, ARG_TYPE_STRING, 0}
},
{
"root",
1,
@ -649,6 +699,16 @@ SERVICE *service;
if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0)
rval = (unsigned long)filter_find(arg);
return rval;
case ARG_TYPE_NUMERIC:
{
int i;
for (i = 0; arg[i]; i++)
{
if (arg[i] < '0' || arg[i] > '9')
return 0;
}
return atoi(arg);
}
}
return 0;
}
@ -676,6 +736,7 @@ char *args[MAXARGS + 1];
unsigned long arg1, arg2, arg3;
int in_quotes = 0, escape_next = 0;
char *ptr, *lptr;
bool in_space = false;
args[0] = cli->cmdbuf;
ptr = args[0];
@ -687,6 +748,7 @@ char *ptr, *lptr;
* the use of double quotes.
* The array args contains the broken down words, one per index.
*/
while (*ptr)
{
if (escape_next)
@ -699,9 +761,15 @@ char *ptr, *lptr;
escape_next = 1;
ptr++;
}
else if (in_quotes == 0 && (*ptr == ' ' || *ptr == '\t' || *ptr == '\r' || *ptr == '\n'))
else if (in_quotes == 0 && ((in_space = *ptr == ' ') || *ptr == '\t' || *ptr == '\r' || *ptr == '\n'))
{
*lptr = 0;
if(!in_space){
break;
}
if (args[i] == ptr)
args[i] = ptr + 1;
else
@ -1140,6 +1208,91 @@ disable_service_root(DCB *dcb, SERVICE *service)
serviceEnableRootUser(service, 0);
}
/**
* Enables a log for a single session
* @param session The session in question
* @param dcb Client DCB
* @param type Which log to enable
*/
static void enable_sess_log_action(DCB *dcb, char *arg1, char *arg2)
{
logfile_id_t type;
size_t id = 0;
int max_len = strlen("message");
SESSION* session = get_all_sessions();
ss_dassert(arg1 != NULL && arg2 != NULL && session != NULL);
if (strncmp(arg1, "debug", max_len) == 0) {
type = LOGFILE_DEBUG;
} else if (strncmp(arg1, "trace", max_len) == 0) {
type = LOGFILE_TRACE;
} else if (strncmp(arg1, "error", max_len) == 0) {
type = LOGFILE_ERROR;
} else if (strncmp(arg1, "message", max_len) == 0) {
type = LOGFILE_MESSAGE;
} else {
dcb_printf(dcb, "%s is not supported for enable log\n", arg1);
return ;
}
id = (size_t)strtol(arg2,0,0);
while(session)
{
if(session->ses_id == id)
{
session_enable_log(session,type);
return;
}
session = session->next;
}
dcb_printf(dcb, "Session not found: %s\n", arg2);
}
/**
* Disables a log for a single session
* @param session The session in question
* @param dcb Client DCB
* @param type Which log to disable
*/
static void disable_sess_log_action(DCB *dcb, char *arg1, char *arg2)
{
logfile_id_t type;
int id = 0;
int max_len = strlen("message");
SESSION* session = get_all_sessions();
ss_dassert(arg1 != NULL && arg2 != NULL && session != NULL);
if (strncmp(arg1, "debug", max_len) == 0) {
type = LOGFILE_DEBUG;
} else if (strncmp(arg1, "trace", max_len) == 0) {
type = LOGFILE_TRACE;
} else if (strncmp(arg1, "error", max_len) == 0) {
type = LOGFILE_ERROR;
} else if (strncmp(arg1, "message", max_len) == 0) {
type = LOGFILE_MESSAGE;
} else {
dcb_printf(dcb, "%s is not supported for disable log\n", arg1);
return ;
}
id = (size_t)strtol(arg2,0,0);
while(session)
{
if(session->ses_id == id)
{
session_disable_log(session,type);
return;
}
session = session->next;
}
dcb_printf(dcb, "Session not found: %s\n", arg2);
}
/**
* The log enable action
@ -1161,7 +1314,7 @@ static void enable_log_action(DCB *dcb, char *arg1) {
dcb_printf(dcb, "%s is not supported for enable log\n", arg1);
return ;
}
skygw_log_enable(type);
}
@ -1189,6 +1342,30 @@ static void disable_log_action(DCB *dcb, char *arg1) {
skygw_log_disable(type);
}
/**
* Set the duration of the sleep passed to the poll wait
*
* @param dcb DCB for output
* @param sleeptime Sleep time in milliseconds
*/
static void
set_pollsleep(DCB *dcb, int sleeptime)
{
poll_set_maxwait(sleeptime);
}
/**
* Set the number of non-blockign spins to make
*
* @param dcb DCB for output
* @param nb Number of spins
*/
static void
set_nbpoll(DCB *dcb, int nb)
{
poll_set_nonblocking_polls(nb);
}
#if defined(FAKE_CODE)
static void fail_backendfd(void)
{

View File

@ -89,7 +89,10 @@
#include <mysql_client_server_protocol.h>
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
MODULE_INFO info = {
MODULE_API_ROUTER,

View File

@ -45,9 +45,10 @@ MODULE_INFO info = {
# include <mysql_client_server_protocol.h>
#endif
extern int lm_enabled_logfiles_bitmask;
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
/**
* @file readwritesplit.c The entry points for the read/write query splitting
* router module.
@ -1093,7 +1094,6 @@ static bool get_dcb(
*/
if (master_bref == NULL)
{
succp = false;
goto return_succp;
}
#if defined(SS_DEBUG)
@ -1164,30 +1164,20 @@ static bool get_dcb(
}
/**
* If there are no candidates yet accept both master or
* slave. If candidate is master, any slave replaces it.
* slave.
*/
else if (candidate_bref == NULL ||
(SERVER_IS_MASTER(candidate_bref->bref_backend->backend_server) &&
SERVER_IS_SLAVE(b->backend_server)))
else if (candidate_bref == NULL)
{
/**
* Ensure that master has not changed dunring
* session and abort if it has.
*/
if (SERVER_IS_MASTER(b->backend_server))
if (SERVER_IS_MASTER(b->backend_server) &&
&backend_ref[i] == master_bref)
{
if (candidate_bref != master_bref)
{
/** Log master failure */
succp = false;
break;
}
else
{
/** found master */
candidate_bref = &backend_ref[i];
succp = true;
}
/** found master */
candidate_bref = &backend_ref[i];
succp = true;
}
/**
* Ensure that max replication lag is not set
@ -1203,6 +1193,20 @@ static bool get_dcb(
succp = true;
}
}
/**
* If candidate is master, any slave which doesn't break
* replication lag limits replaces it.
*/
else if (SERVER_IS_MASTER(candidate_bref->bref_backend->backend_server) &&
SERVER_IS_SLAVE(b->backend_server) &&
(max_rlag == MAX_RLAG_UNDEFINED ||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
b->backend_server->rlag <= max_rlag)))
{
/** found slave */
candidate_bref = &backend_ref[i];
succp = true;
}
/**
* When candidate exists, compare it against the current
* backend and update assign it to new candidate if
@ -1946,7 +1950,7 @@ static int routeQuery(
char* contentstr = strndup(data, len);
char* qtypestr = skygw_get_qtype_str(qtype);
LOGIF(LT, (skygw_log_write(
skygw_log_write(
LOGFILE_TRACE,
"> Autocommit: %s, trx is %s, cmd: %s, type: %s, "
"stmt: %s%s %s",
@ -1956,7 +1960,7 @@ static int routeQuery(
(qtypestr==NULL ? "N/A" : qtypestr),
contentstr,
(querybuf->hint == NULL ? "" : ", Hint:"),
(querybuf->hint == NULL ? "" : STRHINTTYPE(querybuf->hint->type)))));
(querybuf->hint == NULL ? "" : STRHINTTYPE(querybuf->hint->type)));
free(contentstr);
free(qtypestr);