Merge branch 'blr' into release-1.0GA
Conflicts: server/core/service.c
This commit is contained in:
commit
a655e394ac
@ -219,7 +219,7 @@ HASHTABLE *oldresources;
|
||||
int add_mysql_users_with_host_ipv4(USERS *users, char *user, char *host, char *passwd, char *anydb, char *db) {
|
||||
struct sockaddr_in serv_addr;
|
||||
MYSQL_USER_HOST key;
|
||||
char ret_ip[INET_ADDRSTRLEN + 1]="";
|
||||
char ret_ip[400]="";
|
||||
int ret = 0;
|
||||
|
||||
if (users == NULL || user == NULL || host == NULL) {
|
||||
|
@ -81,7 +81,7 @@ setipaddress(struct in_addr *a, char *p) {
|
||||
if ((rc = getaddrinfo(p, NULL, &hint, &ai)) != 0) {
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : getaddrinfo failed for [%s] due [%s]",
|
||||
"Error: Failed to obtain address for host %s, %s",
|
||||
p,
|
||||
gai_strerror(rc))));
|
||||
|
||||
@ -94,7 +94,7 @@ setipaddress(struct in_addr *a, char *p) {
|
||||
if ((rc = getaddrinfo(p, NULL, &hint, &ai)) != 0) {
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : getaddrinfo failed for [%s] due [%s]",
|
||||
"Error: Failed to obtain address for host %s, %s",
|
||||
p,
|
||||
gai_strerror(rc))));
|
||||
|
||||
|
@ -349,14 +349,14 @@ serviceStart(SERVICE *service)
|
||||
SERV_PROTOCOL *port;
|
||||
int listeners = 0;
|
||||
|
||||
if((service->router_instance = service->router->createInstance(service,
|
||||
service->routerOptions)) == NULL)
|
||||
if ((service->router_instance = service->router->createInstance(service,
|
||||
service->routerOptions)) == NULL)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Failed to start router for service '%s'.",
|
||||
service->name)));
|
||||
return listeners;
|
||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||
"%s: Failed to create router instance for service. Service not started.",
|
||||
service->name)));
|
||||
service->state = SERVICE_STATE_FAILED;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
port = service->ports;
|
||||
@ -366,7 +366,10 @@ int listeners = 0;
|
||||
port = port->next;
|
||||
}
|
||||
if (listeners)
|
||||
{
|
||||
service->state = SERVICE_STATE_STARTED;
|
||||
service->stats.started = time(0);
|
||||
}
|
||||
|
||||
return listeners;
|
||||
}
|
||||
@ -445,6 +448,7 @@ int listeners = 0;
|
||||
|
||||
port = port->next;
|
||||
}
|
||||
service->state = SERVICE_STATE_STOPPED;
|
||||
|
||||
return listeners;
|
||||
}
|
||||
@ -916,7 +920,22 @@ int i;
|
||||
service->name);
|
||||
dcb_printf(dcb, "\tRouter: %s (%p)\n",
|
||||
service->routerModule, service->router);
|
||||
if (service->router)
|
||||
switch (service->state)
|
||||
{
|
||||
case SERVICE_STATE_STARTED:
|
||||
dcb_printf(dcb, "\tState: Started\n");
|
||||
break;
|
||||
case SERVICE_STATE_STOPPED:
|
||||
dcb_printf(dcb, "\tState: Stopped\n");
|
||||
break;
|
||||
case SERVICE_STATE_FAILED:
|
||||
dcb_printf(dcb, "\tState: Failed\n");
|
||||
break;
|
||||
case SERVICE_STATE_ALLOC:
|
||||
dcb_printf(dcb, "\tState: Allocated\n");
|
||||
break;
|
||||
}
|
||||
if (service->router && service->router_instance)
|
||||
service->router->diagnostics(service->router_instance, dcb);
|
||||
dcb_printf(dcb, "\tStarted: %s",
|
||||
asctime_r(localtime_r(&service->stats.started, &result), timebuf));
|
||||
|
@ -144,6 +144,8 @@ typedef enum count_spec_t {COUNT_NONE=0, COUNT_ATLEAST, COUNT_EXACT, COUNT_ATMOS
|
||||
|
||||
#define SERVICE_STATE_ALLOC 1 /**< The service has been allocated */
|
||||
#define SERVICE_STATE_STARTED 2 /**< The service has been started */
|
||||
#define SERVICE_STATE_FAILED 3 /**< The service failed to start */
|
||||
#define SERVICE_STATE_STOPPED 4 /**< The service has been stopped */
|
||||
|
||||
extern SERVICE *service_alloc(const char *, const char *);
|
||||
extern int service_free(SERVICE *);
|
||||
|
@ -161,6 +161,7 @@ typedef struct router_slave {
|
||||
int binlog_pos; /*< Binlog position for this slave */
|
||||
char binlogfile[BINLOG_FNAMELEN+1];
|
||||
/*< Current binlog file for this slave */
|
||||
char *uuid; /*< Slave UUID */
|
||||
BLFILE *file; /*< Currently open binlog file */
|
||||
int serverid; /*< Server-id of the slave */
|
||||
char *hostname; /*< Hostname of the slave, if known */
|
||||
@ -227,6 +228,8 @@ typedef struct {
|
||||
GWBUF *utf8; /*< Set NAMES utf8 */
|
||||
GWBUF *select1; /*< select 1 */
|
||||
GWBUF *selectver; /*< select version() */
|
||||
GWBUF *selectvercom; /*< select @@version_comment */
|
||||
GWBUF *selecthostname;/*< select @@hostname */
|
||||
uint8_t *fde_event; /*< Format Description Event */
|
||||
int fde_len; /*< Length of fde_event */
|
||||
} MASTER_RESPONSES;
|
||||
@ -300,16 +303,19 @@ typedef struct router_instance {
|
||||
#define BLRM_UTF8 0x000C
|
||||
#define BLRM_SELECT1 0x000D
|
||||
#define BLRM_SELECTVER 0x000E
|
||||
#define BLRM_REGISTER 0x000F
|
||||
#define BLRM_BINLOGDUMP 0x0010
|
||||
#define BLRM_SELECTVERCOM 0x000F
|
||||
#define BLRM_SELECTHOSTNAME 0x0010
|
||||
#define BLRM_REGISTER 0x0011
|
||||
#define BLRM_BINLOGDUMP 0x0012
|
||||
|
||||
#define BLRM_MAXSTATE 0x0010
|
||||
#define BLRM_MAXSTATE 0x0012
|
||||
|
||||
static char *blrm_states[] = { "Unconnected", "Connecting", "Authenticated", "Timestamp retrieval",
|
||||
"Server ID retrieval", "HeartBeat Period setup", "binlog checksum config",
|
||||
"binlog checksum rerieval", "GTID Mode retrieval", "Master UUID retrieval",
|
||||
"Set Slave UUID", "Set Names latin1", "Set Names utf8", "select 1",
|
||||
"select version()", "Register slave", "Binlog Dump" };
|
||||
"select version()", "select @@version_comment", "select @@hostname",
|
||||
"Register slave", "Binlog Dump" };
|
||||
|
||||
#define BLRS_CREATED 0x0000
|
||||
#define BLRS_UNREGISTERED 0x0001
|
||||
@ -338,6 +344,8 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered",
|
||||
*/
|
||||
#define COM_QUIT 0x01
|
||||
#define COM_QUERY 0x03
|
||||
#define COM_STATISTICS 0x09
|
||||
#define COM_PING 0x0e
|
||||
#define COM_REGISTER_SLAVE 0x15
|
||||
#define COM_BINLOG_DUMP 0x12
|
||||
|
||||
@ -429,9 +437,9 @@ extern void blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr);
|
||||
extern int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
|
||||
extern void blr_init_cache(ROUTER_INSTANCE *);
|
||||
|
||||
extern void blr_file_init(ROUTER_INSTANCE *);
|
||||
extern void blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *);
|
||||
extern void blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t);
|
||||
extern int blr_file_init(ROUTER_INSTANCE *);
|
||||
extern int blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *);
|
||||
extern int blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t);
|
||||
extern void blr_file_flush(ROUTER_INSTANCE *);
|
||||
extern BLFILE *blr_open_binlog(ROUTER_INSTANCE *, char *);
|
||||
extern GWBUF *blr_read_binlog(ROUTER_INSTANCE *, BLFILE *, unsigned int, REP_HEADER *);
|
||||
|
@ -47,7 +47,7 @@ CLIOBJ=$(CLISRCS:.c=.o)
|
||||
SRCS=$(TESTSRCS) $(READCONSRCS) $(DEBUGCLISRCS) cli.c
|
||||
OBJ=$(SRCS:.c=.o)
|
||||
LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager
|
||||
MODULES= libdebugcli.so libreadconnroute.so libtestroute.so libcli.so
|
||||
MODULES= libdebugcli.so libreadconnroute.so libtestroute.so libcli.so libbinlogrouter.so
|
||||
|
||||
|
||||
all: $(MODULES)
|
||||
@ -68,12 +68,16 @@ libcli.so: $(CLIOBJ)
|
||||
libreadwritesplit.so:
|
||||
(cd readwritesplit; touch depend.mk ; make; cp $@ ..)
|
||||
|
||||
libbinlogrouter.so:
|
||||
(cd binlog; touch depend.mk ; make; cp $@ ..)
|
||||
|
||||
.c.o:
|
||||
$(CC) $(CFLAGS) $< -o $@
|
||||
|
||||
clean:
|
||||
$(DEL) $(OBJ) $(MODULES)
|
||||
(cd readwritesplit; touch depend.mk; make clean)
|
||||
(cd binlog; touch depend.mk; make clean)
|
||||
|
||||
tags:
|
||||
ctags $(SRCS) $(HDRS)
|
||||
@ -83,10 +87,12 @@ depend:
|
||||
@$(DEL) depend.mk
|
||||
cc -M $(CFLAGS) $(SRCS) > depend.mk
|
||||
(cd readwritesplit; touch depend.mk ; make depend)
|
||||
(cd binlog; touch depend.mk ; make depend)
|
||||
|
||||
install: $(MODULES)
|
||||
install -D $(MODULES) $(DEST)/modules
|
||||
(cd readwritesplit; make DEST=$(DEST) install)
|
||||
(cd binlog; make DEST=$(DEST) install)
|
||||
|
||||
cleantests:
|
||||
$(MAKE) -C test cleantests
|
||||
|
@ -332,16 +332,6 @@ int i;
|
||||
inst->fileroot = strdup(BINLOG_NAME_ROOT);
|
||||
}
|
||||
|
||||
/*
|
||||
* We have completed the creation of the instance data, so now
|
||||
* insert this router instance into the linked list of routers
|
||||
* that have been created with this module.
|
||||
*/
|
||||
spinlock_acquire(&instlock);
|
||||
inst->next = instances;
|
||||
instances = inst;
|
||||
spinlock_release(&instlock);
|
||||
|
||||
inst->active_logs = 0;
|
||||
inst->reconnect_pending = 0;
|
||||
inst->handling_threads = 0;
|
||||
@ -353,12 +343,31 @@ int i;
|
||||
/*
|
||||
* Initialise the binlog file and position
|
||||
*/
|
||||
blr_file_init(inst);
|
||||
if (blr_file_init(inst) == 0)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"%s: Service not started due to lack of binlog directory.",
|
||||
service->name)));
|
||||
free(inst);
|
||||
return NULL;
|
||||
}
|
||||
LOGIF(LT, (skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"Binlog router: current binlog file is: %s, current position %u\n",
|
||||
inst->binlog_name, inst->binlog_position)));
|
||||
|
||||
|
||||
/*
|
||||
* We have completed the creation of the instance data, so now
|
||||
* insert this router instance into the linked list of routers
|
||||
* that have been created with this module.
|
||||
*/
|
||||
spinlock_acquire(&instlock);
|
||||
inst->next = instances;
|
||||
instances = inst;
|
||||
spinlock_release(&instlock);
|
||||
|
||||
/*
|
||||
* Initialise the binlog cache for this router instance
|
||||
*/
|
||||
@ -423,6 +432,8 @@ ROUTER_SLAVE *slave;
|
||||
slave->cstate = 0;
|
||||
slave->pthread = 0;
|
||||
slave->overrun = 0;
|
||||
slave->uuid = NULL;
|
||||
slave->hostname = NULL;
|
||||
spinlock_init(&slave->catch_lock);
|
||||
slave->dcb = session->client;
|
||||
slave->router = inst;
|
||||
@ -777,8 +788,10 @@ struct tm tm;
|
||||
session->serverid);
|
||||
if (session->hostname)
|
||||
dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname);
|
||||
if (session->uuid)
|
||||
dcb_printf(dcb, "\t\tSlave UUID: %s\n", session->uuid);
|
||||
dcb_printf(dcb,
|
||||
"\t\tSlave: %d\n",
|
||||
"\t\tSlave: %s\n",
|
||||
session->dcb->remote);
|
||||
dcb_printf(dcb,
|
||||
"\t\tSlave DCB: %p\n",
|
||||
@ -1034,3 +1047,144 @@ ROUTER_SLAVE *slave;
|
||||
}
|
||||
spinlock_release(&router->lock);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return some basic statistics from the router in response to a COM_STATISTICS
|
||||
* request.
|
||||
*
|
||||
* @param router The router instance
|
||||
* @param slave The "slave" connection that requested the statistics
|
||||
* @param queue The statistics request
|
||||
*
|
||||
* @return non-zero on sucessful send
|
||||
*/
|
||||
int
|
||||
blr_statistics(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
|
||||
{
|
||||
char result[1000], *ptr;
|
||||
GWBUF *ret;
|
||||
int len;
|
||||
|
||||
snprintf(result, 1000,
|
||||
"Uptime: %u Threads: %u Events: %u Slaves: %u Master State: %s",
|
||||
time(0) - router->connect_time,
|
||||
config_threadcount(),
|
||||
router->stats.n_binlogs_ses,
|
||||
router->stats.n_slaves,
|
||||
blrm_states[router->master_state]);
|
||||
if ((ret = gwbuf_alloc(4 + strlen(result))) == NULL)
|
||||
return 0;
|
||||
len = strlen(result);
|
||||
ptr = GWBUF_DATA(ret);
|
||||
*ptr++ = len & 0xff;
|
||||
*ptr++ = (len & 0xff00) >> 8;
|
||||
*ptr++ = (len & 0xff0000) >> 16;
|
||||
*ptr++ = 1;
|
||||
strncpy(ptr, result, len);
|
||||
|
||||
return slave->dcb->func.write(slave->dcb, ret);
|
||||
}
|
||||
|
||||
int
|
||||
blr_ping(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
|
||||
{
|
||||
char *ptr;
|
||||
GWBUF *ret;
|
||||
int len;
|
||||
|
||||
if ((ret = gwbuf_alloc(5)) == NULL)
|
||||
return 0;
|
||||
ptr = GWBUF_DATA(ret);
|
||||
*ptr++ = 0x01;
|
||||
*ptr++ = 0;
|
||||
*ptr++ = 0;
|
||||
*ptr++ = 1;
|
||||
*ptr = 0; // OK
|
||||
|
||||
return slave->dcb->func.write(slave->dcb, ret);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* mysql_send_custom_error
|
||||
*
|
||||
* Send a MySQL protocol Generic ERR message, to the dcb
|
||||
* Note the errno and state are still fixed now
|
||||
*
|
||||
* @param dcb Owner_Dcb Control Block for the connection to which the OK is sent
|
||||
* @param packet_number
|
||||
* @param in_affected_rows
|
||||
* @param msg
|
||||
* @return 1 Non-zero if data was sent
|
||||
*
|
||||
*/
|
||||
int
|
||||
blr_send_custom_error(DCB *dcb, int packet_number, int affected_rows, char *msg)
|
||||
{
|
||||
uint8_t *outbuf = NULL;
|
||||
uint32_t mysql_payload_size = 0;
|
||||
uint8_t mysql_packet_header[4];
|
||||
uint8_t *mysql_payload = NULL;
|
||||
uint8_t field_count = 0;
|
||||
uint8_t mysql_err[2];
|
||||
uint8_t mysql_statemsg[6];
|
||||
unsigned int mysql_errno = 0;
|
||||
const char *mysql_error_msg = NULL;
|
||||
const char *mysql_state = NULL;
|
||||
GWBUF *errbuf = NULL;
|
||||
|
||||
mysql_errno = 2003;
|
||||
mysql_error_msg = "An errorr occurred ...";
|
||||
mysql_state = "HY000";
|
||||
|
||||
field_count = 0xff;
|
||||
gw_mysql_set_byte2(mysql_err, mysql_errno);
|
||||
mysql_statemsg[0]='#';
|
||||
memcpy(mysql_statemsg+1, mysql_state, 5);
|
||||
|
||||
if (msg != NULL) {
|
||||
mysql_error_msg = msg;
|
||||
}
|
||||
|
||||
mysql_payload_size = sizeof(field_count) +
|
||||
sizeof(mysql_err) +
|
||||
sizeof(mysql_statemsg) +
|
||||
strlen(mysql_error_msg);
|
||||
|
||||
/** allocate memory for packet header + payload */
|
||||
errbuf = gwbuf_alloc(sizeof(mysql_packet_header) + mysql_payload_size);
|
||||
ss_dassert(errbuf != NULL);
|
||||
|
||||
if (errbuf == NULL)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
outbuf = GWBUF_DATA(errbuf);
|
||||
|
||||
/** write packet header and packet number */
|
||||
gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size);
|
||||
mysql_packet_header[3] = packet_number;
|
||||
|
||||
/** write header */
|
||||
memcpy(outbuf, mysql_packet_header, sizeof(mysql_packet_header));
|
||||
|
||||
mysql_payload = outbuf + sizeof(mysql_packet_header);
|
||||
|
||||
/** write field */
|
||||
memcpy(mysql_payload, &field_count, sizeof(field_count));
|
||||
mysql_payload = mysql_payload + sizeof(field_count);
|
||||
|
||||
/** write errno */
|
||||
memcpy(mysql_payload, mysql_err, sizeof(mysql_err));
|
||||
mysql_payload = mysql_payload + sizeof(mysql_err);
|
||||
|
||||
/** write sqlstate */
|
||||
memcpy(mysql_payload, mysql_statemsg, sizeof(mysql_statemsg));
|
||||
mysql_payload = mysql_payload + sizeof(mysql_statemsg);
|
||||
|
||||
/** write error message */
|
||||
memcpy(mysql_payload, mysql_error_msg, strlen(mysql_error_msg));
|
||||
|
||||
return dcb->func.write(dcb, errbuf);
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ extern size_t log_ses_count[];
|
||||
extern __thread log_info_t tls_log_info;
|
||||
|
||||
|
||||
static void blr_file_create(ROUTER_INSTANCE *router, char *file);
|
||||
static int blr_file_create(ROUTER_INSTANCE *router, char *file);
|
||||
static void blr_file_append(ROUTER_INSTANCE *router, char *file);
|
||||
static uint32_t extract_field(uint8_t *src, int bits);
|
||||
static void blr_log_header(logfile_id_t file, char *msg, uint8_t *ptr);
|
||||
@ -68,7 +68,7 @@ static void blr_log_header(logfile_id_t file, char *msg, uint8_t *ptr);
|
||||
*
|
||||
* @param router The router instance this defines the master for this replication chain
|
||||
*/
|
||||
void
|
||||
int
|
||||
blr_file_init(ROUTER_INSTANCE *router)
|
||||
{
|
||||
char *ptr, path[1024], filename[1050];
|
||||
@ -92,16 +92,28 @@ struct dirent *dp;
|
||||
|
||||
router->binlogdir = strdup(path);
|
||||
}
|
||||
else
|
||||
{
|
||||
strncpy(path, router->binlogdir, 1024);
|
||||
}
|
||||
if (access(router->binlogdir, R_OK) == -1)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"%s: Unable to read the binlog directory %s.",
|
||||
router->service->name, router->binlogdir)));
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* First try to find a binlog file number by reading the directory */
|
||||
root_len = strlen(router->fileroot);
|
||||
dirp = opendir(path);
|
||||
if ((dirp = opendir(path)) == NULL)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"%s: Unable to read the binlog directory %s, %s.",
|
||||
router->service->name, router->binlogdir,
|
||||
strerror(errno))));
|
||||
return 0;
|
||||
}
|
||||
while ((dp = readdir(dirp)) != NULL)
|
||||
{
|
||||
if (strncmp(dp->d_name, router->fileroot, root_len) == 0)
|
||||
@ -134,20 +146,21 @@ struct dirent *dp;
|
||||
router->initbinlog);
|
||||
else
|
||||
sprintf(filename, BINLOG_NAMEFMT, router->fileroot, 1);
|
||||
blr_file_create(router, filename);
|
||||
if (! blr_file_create(router, filename))
|
||||
return 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
sprintf(filename, BINLOG_NAMEFMT, router->fileroot, n);
|
||||
blr_file_append(router, filename);
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
void
|
||||
int
|
||||
blr_file_rotate(ROUTER_INSTANCE *router, char *file, uint64_t pos)
|
||||
{
|
||||
blr_file_create(router, file);
|
||||
return blr_file_create(router, file);
|
||||
}
|
||||
|
||||
|
||||
@ -156,8 +169,9 @@ blr_file_rotate(ROUTER_INSTANCE *router, char *file, uint64_t pos)
|
||||
*
|
||||
* @param router The router instance
|
||||
* @param file The binlog file name
|
||||
* @return Non-zero if the fie creation succeeded
|
||||
*/
|
||||
static void
|
||||
static int
|
||||
blr_file_create(ROUTER_INSTANCE *router, char *file)
|
||||
{
|
||||
char path[1024];
|
||||
@ -175,7 +189,9 @@ unsigned char magic[] = BINLOG_MAGIC;
|
||||
else
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"Failed to create binlog file %s", path)));
|
||||
"%s: Failed to create binlog file %s, %s.",
|
||||
router->service->name, path, strerror(errno))));
|
||||
return 0;
|
||||
}
|
||||
fsync(fd);
|
||||
close(router->binlog_fd);
|
||||
@ -184,6 +200,7 @@ unsigned char magic[] = BINLOG_MAGIC;
|
||||
router->binlog_position = 4; /* Initial position after the magic number */
|
||||
spinlock_release(&router->binlog_lock);
|
||||
router->binlog_fd = fd;
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
@ -225,15 +242,31 @@ int fd;
|
||||
* @param router The router instance
|
||||
* @param buf The binlog record
|
||||
* @param len The length of the binlog record
|
||||
* @return Return the number of bytes written
|
||||
*/
|
||||
void
|
||||
int
|
||||
blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *buf)
|
||||
{
|
||||
pwrite(router->binlog_fd, buf, hdr->event_size, hdr->next_pos - hdr->event_size);
|
||||
int n;
|
||||
|
||||
if ((n = pwrite(router->binlog_fd, buf, hdr->event_size,
|
||||
hdr->next_pos - hdr->event_size)) != hdr->event_size)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"%s: Failed to write binlog record at %d of %s, %s. "
|
||||
"Truncating to previous record.",
|
||||
router->service->name, hdr->next_pos - hdr->event_size,
|
||||
router->binlog_name,
|
||||
strerror(errno))));
|
||||
/* Remove any partual event that was written */
|
||||
ftruncate(router->binlog_fd, hdr->next_pos - hdr->event_size);
|
||||
return 0;
|
||||
}
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
router->binlog_position = hdr->next_pos;
|
||||
router->last_written = hdr->next_pos - hdr->event_size;
|
||||
spinlock_release(&router->binlog_lock);
|
||||
return n;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -71,13 +71,13 @@ static GWBUF *blr_make_registration(ROUTER_INSTANCE *router);
|
||||
static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router);
|
||||
void encode_value(unsigned char *data, unsigned int value, int len);
|
||||
void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
|
||||
static void blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr);
|
||||
static int blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr);
|
||||
void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
|
||||
static void *CreateMySQLAuthData(char *username, char *password, char *database);
|
||||
void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
|
||||
inline uint32_t extract_field(uint8_t *src, int bits);
|
||||
static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len);
|
||||
|
||||
static void blr_master_close(ROUTER_INSTANCE *);
|
||||
static int keepalive = 1;
|
||||
|
||||
/**
|
||||
@ -247,6 +247,37 @@ int do_reconnect = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown a connection to the master
|
||||
*
|
||||
* @param router The router instance
|
||||
*/
|
||||
void
|
||||
blr_master_close(ROUTER_INSTANCE *router)
|
||||
{
|
||||
dcb_close(router->master);
|
||||
router->master_state = BLRM_UNCONNECTED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark this master connection for a delayed reconnect, used during
|
||||
* error recovery to cause a reconnect after 60 seconds.
|
||||
*
|
||||
* @param router The router instance
|
||||
*/
|
||||
void
|
||||
blr_master_delayed_connect(ROUTER_INSTANCE *router)
|
||||
{
|
||||
char *name;
|
||||
|
||||
if ((name = malloc(strlen(router->service->name)
|
||||
+ strlen(" Master Recovery")+1)) != NULL);
|
||||
{
|
||||
sprintf(name, "%s Master Recovery", router->service->name);
|
||||
hktask_oneshot(name, blr_start_master, router, 60);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Binlog router master side state machine event handler.
|
||||
*
|
||||
@ -407,6 +438,20 @@ char query[128];
|
||||
case BLRM_SELECTVER:
|
||||
// Response to SELECT VERSION should be stored
|
||||
router->saved_master.selectver = buf;
|
||||
buf = blr_make_query("SELECT @@version_comment limit 1;");
|
||||
router->master_state = BLRM_SELECTVERCOM;
|
||||
router->master->func.write(router->master, buf);
|
||||
break;
|
||||
case BLRM_SELECTVERCOM:
|
||||
// Response to SELECT @@version_comment should be stored
|
||||
router->saved_master.selectvercom = buf;
|
||||
buf = blr_make_query("SELECT @@hostname;");
|
||||
router->master_state = BLRM_SELECTHOSTNAME;
|
||||
router->master->func.write(router->master, buf);
|
||||
break;
|
||||
case BLRM_SELECTHOSTNAME:
|
||||
// Response to SELECT @@hostname should be stored
|
||||
router->saved_master.selecthostname = buf;
|
||||
buf = blr_make_registration(router);
|
||||
router->master_state = BLRM_REGISTER;
|
||||
router->master->func.write(router->master, buf);
|
||||
@ -809,10 +854,36 @@ static REP_HEADER phdr;
|
||||
// into the binlog file
|
||||
if (hdr.event_type == ROTATE_EVENT)
|
||||
router->rotating = 1;
|
||||
blr_write_binlog_record(router, &hdr, ptr);
|
||||
if (blr_write_binlog_record(router, &hdr, ptr) == 0)
|
||||
{
|
||||
/*
|
||||
* Failed to write to the
|
||||
* binlog file, destroy the
|
||||
* buffer chain and close the
|
||||
* connection with the master
|
||||
*/
|
||||
while ((pkt = gwbuf_consume(pkt,
|
||||
GWBUF_LENGTH(pkt))) != NULL);
|
||||
blr_master_close(router);
|
||||
blr_master_delayed_connect(router);
|
||||
return;
|
||||
}
|
||||
if (hdr.event_type == ROTATE_EVENT)
|
||||
{
|
||||
blr_rotate_event(router, ptr, &hdr);
|
||||
if (!blr_rotate_event(router, ptr, &hdr))
|
||||
{
|
||||
/*
|
||||
* Failed to write to the
|
||||
* binlog file, destroy the
|
||||
* buffer chain and close the
|
||||
* connection with the master
|
||||
*/
|
||||
while ((pkt = gwbuf_consume(pkt,
|
||||
GWBUF_LENGTH(pkt))) != NULL);
|
||||
blr_master_close(router);
|
||||
blr_master_delayed_connect(router);
|
||||
return;
|
||||
}
|
||||
}
|
||||
blr_distribute_binlog_record(router, &hdr, ptr);
|
||||
}
|
||||
@ -833,7 +904,20 @@ static REP_HEADER phdr;
|
||||
if (hdr.event_type == ROTATE_EVENT)
|
||||
{
|
||||
router->rotating = 1;
|
||||
blr_rotate_event(router, ptr, &hdr);
|
||||
if (!blr_rotate_event(router, ptr, &hdr))
|
||||
{
|
||||
/*
|
||||
* Failed to write to the
|
||||
* binlog file, destroy the
|
||||
* buffer chain and close the
|
||||
* connection with the master
|
||||
*/
|
||||
while ((pkt = gwbuf_consume(pkt,
|
||||
GWBUF_LENGTH(pkt))) != NULL);
|
||||
blr_master_close(router);
|
||||
blr_master_delayed_connect(router);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -933,7 +1017,7 @@ register uint32_t rval = 0, shift = 0;
|
||||
* @param ptr The packet containing the rotate event
|
||||
* @param hdr The replication message header
|
||||
*/
|
||||
static void
|
||||
static int
|
||||
blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr)
|
||||
{
|
||||
int len, slen;
|
||||
@ -963,9 +1047,14 @@ char file[BINLOG_FNAMELEN+1];
|
||||
if (strncmp(router->binlog_name, file, slen) != 0)
|
||||
{
|
||||
router->stats.n_rotates++;
|
||||
blr_file_rotate(router, file, pos);
|
||||
if (blr_file_rotate(router, file, pos) == 0)
|
||||
{
|
||||
router->rotating = 0;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
router->rotating = 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -111,12 +111,20 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
|
||||
case COM_BINLOG_DUMP:
|
||||
return blr_slave_binlog_dump(router, slave, queue);
|
||||
break;
|
||||
case COM_STATISTICS:
|
||||
return blr_statistics(router, slave, queue);
|
||||
break;
|
||||
case COM_PING:
|
||||
return blr_ping(router, slave, queue);
|
||||
break;
|
||||
case COM_QUIT:
|
||||
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
|
||||
"COM_QUIT received from slave with server_id %d",
|
||||
slave->serverid)));
|
||||
break;
|
||||
default:
|
||||
blr_send_custom_error(slave->dcb, 1, 0,
|
||||
"MySQL command not supported by the binlog router.");
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"Unexpected MySQL Command (%d) received from slave",
|
||||
@ -133,12 +141,14 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
|
||||
* when MaxScale registered as a slave. The exception to the rule is the
|
||||
* request to obtain the current timestamp value of the server.
|
||||
*
|
||||
* Five select statements are currently supported:
|
||||
* Seven select statements are currently supported:
|
||||
* SELECT UNIX_TIMESTAMP();
|
||||
* SELECT @master_binlog_checksum
|
||||
* SELECT @@GLOBAL.GTID_MODE
|
||||
* SELECT VERSION()
|
||||
* SELECT 1
|
||||
* SELECT @@version_comment limit 1
|
||||
* SELECT @@hostname
|
||||
*
|
||||
* Two show commands are supported:
|
||||
* SHOW VARIABLES LIKE 'SERVER_ID'
|
||||
@ -208,6 +218,16 @@ int query_len;
|
||||
free(query_text);
|
||||
return blr_slave_replay(router, slave, router->saved_master.selectver);
|
||||
}
|
||||
else if (strcasecmp(word, "@@version_comment") == 0)
|
||||
{
|
||||
free(query_text);
|
||||
return blr_slave_replay(router, slave, router->saved_master.selectvercom);
|
||||
}
|
||||
else if (strcasecmp(word, "@@hostname") == 0)
|
||||
{
|
||||
free(query_text);
|
||||
return blr_slave_replay(router, slave, router->saved_master.selecthostname);
|
||||
}
|
||||
}
|
||||
else if (strcasecmp(word, "SHOW") == 0)
|
||||
{
|
||||
@ -251,6 +271,8 @@ int query_len;
|
||||
}
|
||||
else if (strcasecmp(word, "@slave_uuid") == 0)
|
||||
{
|
||||
if ((word = strtok_r(NULL, sep, &brkb)) != NULL)
|
||||
slave->uuid = strdup(word);
|
||||
free(query_text);
|
||||
return blr_slave_replay(router, slave, router->saved_master.setslaveuuid);
|
||||
}
|
||||
@ -1075,3 +1097,81 @@ uint32_t chksum;
|
||||
encode_value(ptr, chksum, 32);
|
||||
slave->dcb->func.write(slave->dcb, head);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Send the field count packet in a response packet sequence.
|
||||
*
|
||||
* @param router The router
|
||||
* @param slave The slave connection
|
||||
* @param count Number of columns in the result set
|
||||
* @return Non-zero on success
|
||||
*/
|
||||
static int
|
||||
blr_slave_send_fieldcount(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int count)
|
||||
{
|
||||
GWBUF *pkt;
|
||||
uint8_t *ptr;
|
||||
|
||||
if ((pkt = gwbuf_alloc(5)) == NULL)
|
||||
return 0;
|
||||
ptr = GWBUF_DATA(pkt);
|
||||
encode_value(ptr, 1, 24); // Add length of data packet
|
||||
ptr += 3;
|
||||
*ptr++ = 0x01; // Sequence number in response
|
||||
*ptr++ = count; // Length of result string
|
||||
return slave->dcb->func.write(slave->dcb, pkt);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Send the column definition packet in a response packet sequence.
|
||||
*
|
||||
* @param router The router
|
||||
* @param slave The slave connection
|
||||
* @param name Name of the column
|
||||
* @param type Column type
|
||||
* @param len Column length
|
||||
* @param seqno Packet sequence number
|
||||
* @return Non-zero on success
|
||||
*/
|
||||
static int
|
||||
blr_slave_send_columndef(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *name, int type, int len, uint8_t seqno)
|
||||
{
|
||||
GWBUF *pkt;
|
||||
uint8_t *ptr;
|
||||
|
||||
if ((pkt = gwbuf_alloc(26 + strlen(name))) == NULL)
|
||||
return 0;
|
||||
ptr = GWBUF_DATA(pkt);
|
||||
encode_value(ptr, 22 + strlen(name), 24); // Add length of data packet
|
||||
ptr += 3;
|
||||
*ptr++ = seqno; // Sequence number in response
|
||||
*ptr++ = 3; // Catalog is always def
|
||||
*ptr++ = 'd';
|
||||
*ptr++ = 'e';
|
||||
*ptr++ = 'f';
|
||||
*ptr++ = 0; // Schema name length
|
||||
*ptr++ = 0; // virtal table name length
|
||||
*ptr++ = 0; // Table name length
|
||||
*ptr++ = strlen(name); // Column name length;
|
||||
while (*name)
|
||||
*ptr++ = *name++; // Copy the column name
|
||||
*ptr++ = 0; // Orginal column name
|
||||
*ptr++ = 0x0c; // Length of next fields always 12
|
||||
*ptr++ = 0x3f; // Character set
|
||||
*ptr++ = 0;
|
||||
encode_value(ptr, len, 32); // Add length of column
|
||||
ptr += 4;
|
||||
*ptr++ = type;
|
||||
*ptr++ = 0x81; // Two bytes of flags
|
||||
if (type == 0xfd)
|
||||
*ptr++ = 0x1f;
|
||||
else
|
||||
*ptr++ = 0x00;
|
||||
*ptr++= 0;
|
||||
*ptr++= 0;
|
||||
*ptr++= 0;
|
||||
return slave->dcb->func.write(slave->dcb, pkt);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user