Improvements to debug interface & blr updates

This commit is contained in:
Mark Riddoch
2014-05-21 17:25:21 +01:00
parent 8b3ea0c4d9
commit c1d39999ff
9 changed files with 158 additions and 25 deletions

View File

@ -30,6 +30,7 @@
* 14/02/14 Massimiliano Pinto Added enable_root_user in the service_params list * 14/02/14 Massimiliano Pinto Added enable_root_user in the service_params list
* 11/03/14 Massimiliano Pinto Added Unix socket support * 11/03/14 Massimiliano Pinto Added Unix socket support
* 11/05/14 Massimiliano Pinto Added version_string support to service * 11/05/14 Massimiliano Pinto Added version_string support to service
* 19/05/14 Mark Riddoch Added unique names from section headers
* *
* @endverbatim * @endverbatim
*/ */
@ -299,6 +300,7 @@ int error_count = 0;
obj->element = server_alloc(address, obj->element = server_alloc(address,
protocol, protocol,
atoi(port)); atoi(port));
server_set_unique_name(obj->element, obj->object);
} }
else else
{ {

View File

@ -67,6 +67,7 @@ SERVER *server;
server->nextdb = NULL; server->nextdb = NULL;
server->monuser = NULL; server->monuser = NULL;
server->monpw = NULL; server->monpw = NULL;
server->unique_name = NULL;
spinlock_acquire(&server_spin); spinlock_acquire(&server_spin);
server->next = allServers; server->next = allServers;
@ -109,10 +110,49 @@ SERVER *ptr;
/* Clean up session and free the memory */ /* Clean up session and free the memory */
free(server->name); free(server->name);
free(server->protocol); free(server->protocol);
if (server->unique_name)
free(server->unique_name);
free(server); free(server);
return 1; return 1;
} }
/**
* Set a unique name for the server
*
* @param server The server to ste the name on
* @param name The unique name for the server
*/
void
server_set_unique_name(SERVER *server, char *name)
{
server->unique_name = strdup(name);
}
/**
* Find an existing server using the unique section name in
* configuration file
*
* @param servname The Server name or address
* @param port The server port
* @return The server or NULL if not found
*/
SERVER *
server_find_by_unique_name(char *name)
{
SERVER *server;
spinlock_acquire(&server_spin);
server = allServers;
while (server)
{
if (strcmp(server->unique_name, name) == 0)
break;
server = server->next;
}
spinlock_release(&server_spin);
return server;
}
/** /**
* Find an existing server * Find an existing server
* *
@ -190,7 +230,7 @@ char *stat;
ptr = allServers; ptr = allServers;
while (ptr) while (ptr)
{ {
dcb_printf(dcb, "Server %p\n", ptr); dcb_printf(dcb, "Server %p (%s)\n", ptr, ptr->unique_name);
dcb_printf(dcb, "\tServer: %s\n", ptr->name); dcb_printf(dcb, "\tServer: %s\n", ptr->name);
stat = server_status(ptr); stat = server_status(ptr);
dcb_printf(dcb, "\tStatus: %s\n", stat); dcb_printf(dcb, "\tStatus: %s\n", stat);
@ -215,7 +255,7 @@ dprintServer(DCB *dcb, SERVER *server)
{ {
char *stat; char *stat;
dcb_printf(dcb, "Server %p\n", server); dcb_printf(dcb, "Server %p (%s)\n", server, server->unique_name);
dcb_printf(dcb, "\tServer: %s\n", server->name); dcb_printf(dcb, "\tServer: %s\n", server->name);
stat = server_status(server); stat = server_status(server);
dcb_printf(dcb, "\tStatus: %s\n", stat); dcb_printf(dcb, "\tStatus: %s\n", stat);

View File

@ -51,6 +51,7 @@ typedef struct {
* between the gateway and the server. * between the gateway and the server.
*/ */
typedef struct server { typedef struct server {
char *unique_name; /**< Unique name for the server */
char *name; /**< Server name/IP address*/ char *name; /**< Server name/IP address*/
unsigned short port; /**< Port to listen on */ unsigned short port; /**< Port to listen on */
char *protocol; /**< Protocol module to use */ char *protocol; /**< Protocol module to use */
@ -103,6 +104,7 @@ typedef struct server {
extern SERVER *server_alloc(char *, char *, unsigned short); extern SERVER *server_alloc(char *, char *, unsigned short);
extern int server_free(SERVER *); extern int server_free(SERVER *);
extern SERVER *server_find_by_unique_name(char *);
extern SERVER *server_find(char *, unsigned short); extern SERVER *server_find(char *, unsigned short);
extern void printServer(SERVER *); extern void printServer(SERVER *);
extern void printAllServers(); extern void printAllServers();

View File

@ -148,4 +148,5 @@ extern int service_refresh_users(SERVICE *);
extern void printService(SERVICE *); extern void printService(SERVICE *);
extern void printAllServices(); extern void printAllServices();
extern void dprintAllServices(DCB *); extern void dprintAllServices(DCB *);
extern void dprintService(DCB *, SERVICE *);
#endif #endif

View File

@ -540,7 +540,7 @@ int i = 0;
dcb_printf(dcb, "\t\tServer-id: %d\n", session->serverid); dcb_printf(dcb, "\t\tServer-id: %d\n", session->serverid);
if (session->hostname) if (session->hostname)
dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname); dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname);
dcb_printf(dcb, "\t\tSlave DCB: %x\n", session->dcb); dcb_printf(dcb, "\t\tSlave DCB: %p\n", session->dcb);
dcb_printf(dcb, "\t\tNext Sequence No: %d\n", session->seqno); dcb_printf(dcb, "\t\tNext Sequence No: %d\n", session->seqno);
dcb_printf(dcb, "\t\tState: %s\n", blrs_states[session->state]); dcb_printf(dcb, "\t\tState: %s\n", blrs_states[session->state]);
dcb_printf(dcb, "\t\tBinlog file: %s\n", session->binlogfile); dcb_printf(dcb, "\t\tBinlog file: %s\n", session->binlogfile);

View File

@ -423,21 +423,39 @@ int no_residual = 1;
len = extract_field(pdata, 24) + 4; len = extract_field(pdata, 24) + 4;
} }
if (reslen < len && gwbuf_length(pkt) >= len) // Message straddles buffers if (reslen < len && gwbuf_length(pkt) >= len)
{ {
/* Allocate a contiguous buffer for the binlog message */ /*
* The message is contianed in more than the current
* buffer, however we have the complete messasge in
* this buffer and the chain of remaining buffers.
*
* Allocate a contiguous buffer for the binlog message
* and copy the complete message into this buffer.
*/
msg = malloc(len); msg = malloc(len);
if (GWBUF_LENGTH(pkt->next) < len - reslen)
printf("Packet (length %d) spans more than 2 buffers\n", len);
memcpy(msg, pdata, reslen); memcpy(msg, pdata, reslen);
memcpy(&msg[reslen], GWBUF_DATA(pkt->next), len - reslen); memcpy(&msg[reslen], GWBUF_DATA(pkt->next), len - reslen);
ptr = msg; ptr = msg;
} }
else if (reslen < len) // Message straddles buffers else if (reslen < len)
{ {
/*
* The message is not fully contained in the current
* and we do not have the complete message in the
* buffer chain. Therefore we must stop processing until
* we receive the next buffer.
*/
break; break;
} }
else else
{ {
/*
* The message is fully contained in the current buffer
*/
ptr = pdata; ptr = pdata;
msg = NULL; msg = NULL;
} }
@ -474,10 +492,6 @@ int no_residual = 1;
} }
else else
{ {
if (hdr.event_type == ROTATE_EVENT)
{
blr_rotate_event(router, ptr, &hdr);
}
if (hdr.event_type == HEARTBEAT_EVENT) if (hdr.event_type == HEARTBEAT_EVENT)
{ {
#ifdef SHOW_EVENTS #ifdef SHOW_EVENTS
@ -490,8 +504,20 @@ int no_residual = 1;
ptr = ptr + 5; // We don't put the first byte of the payload ptr = ptr + 5; // We don't put the first byte of the payload
// into the binlog file // into the binlog file
blr_write_binlog_record(router, &hdr, ptr); blr_write_binlog_record(router, &hdr, ptr);
if (hdr.event_type == ROTATE_EVENT)
{
blr_rotate_event(router, ptr, &hdr);
}
blr_distribute_binlog_record(router, &hdr, ptr); blr_distribute_binlog_record(router, &hdr, ptr);
} }
else
{
ptr += 5;
if (hdr.event_type == ROTATE_EVENT)
{
blr_rotate_event(router, ptr, &hdr);
}
}
} }
} }
else else
@ -508,7 +534,7 @@ int no_residual = 1;
} }
else else
{ {
pkt = gwbuf_consume(pkt, 4 + hdr.payload_len); pkt = gwbuf_consume(pkt, len);
} }
} }
@ -578,8 +604,6 @@ int len;
uint64_t pos; uint64_t pos;
char file[BINLOG_FNAMELEN+1]; char file[BINLOG_FNAMELEN+1];
ptr += 4; // Skip packet header
ptr++; // Skip the OK
ptr += 19; // Skip event header ptr += 19; // Skip event header
len = hdr->event_size - 19; // Event size minus header len = hdr->event_size - 19; // Event size minus header
pos = extract_field(ptr, 32) + (extract_field(ptr+4, 32) << 32); pos = extract_field(ptr, 32) + (extract_field(ptr+4, 32) << 32);
@ -649,6 +673,10 @@ ROUTER_SLAVE *slave;
memcpy(buf, ptr, hdr->event_size); memcpy(buf, ptr, hdr->event_size);
slave->dcb->func.write(slave->dcb, pkt); slave->dcb->func.write(slave->dcb, pkt);
slave->binlog_pos = hdr->next_pos; slave->binlog_pos = hdr->next_pos;
if (hdr->event_type == ROTATE_EVENT)
{
blr_slave_rotate(slave, ptr);
}
} }
slave = slave->next; slave = slave->next;

View File

@ -719,10 +719,36 @@ struct timespec req;
*ptr++ = slave->seqno++; *ptr++ = slave->seqno++;
*ptr++ = 0; // OK *ptr++ = 0; // OK
head = gwbuf_append(head, record); head = gwbuf_append(head, record);
if (hdr.event_type == ROTATE_EVENT)
{
close(fd);
blr_slave_rotate(slave, GWBUF_DATA(record));
if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s\n",
slave->binlogfile)));
break;
}
}
written = slave->dcb->func.write(slave->dcb, head); written = slave->dcb->func.write(slave->dcb, head);
if (written) if (written)
slave->binlog_pos = hdr.next_pos; slave->binlog_pos = hdr.next_pos;
rval = written; rval = written;
if (hdr.event_type == ROTATE_EVENT)
{
close(fd);
blr_slave_rotate(slave, GWBUF_DATA(record));
if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s\n",
slave->binlogfile)));
break;
}
}
atomic_add(&slave->stats.n_events, 1); atomic_add(&slave->stats.n_events, 1);
burst++; burst++;
} }
@ -765,3 +791,18 @@ ROUTER_INSTANCE *router = slave->router;
blr_slave_catchup(router, slave); blr_slave_catchup(router, slave);
} }
} }
/**
* Rotate the slave to the new binlog file
*
* @param slave The slave instance
* @param ptr The rotate event (minux header and OK byte)
*/
void
blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr)
{
ptr += 19; // Skip header
slave->binlog_pos = extract_field(ptr, 32) + (extract_field(ptr+4, 32) << 32);
memcpy(slave->binlogfile, ptr + 8, BINLOG_FNAMELEN);
slave->binlogfile[BINLOG_FNAMELEN] = 0;
}

View File

@ -36,7 +36,9 @@
* Date Who Description * Date Who Description
* 20/06/13 Mark Riddoch Initial implementation * 20/06/13 Mark Riddoch Initial implementation
* 17/07/13 Mark Riddoch Additional commands * 17/07/13 Mark Riddoch Additional commands
* 09/08/2013 Massimiliano Pinto Addes enable/disable commands (now only for log) * 09/08/2013 Massimiliano Pinto Added enable/disable commands (now only for log)
* 20/05/14 Mark Riddoch Added ability to give server and service names rather
* than simply addresses
* *
* @endverbatim * @endverbatim
*/ */
@ -69,6 +71,8 @@
#define ARG_TYPE_ADDRESS 1 #define ARG_TYPE_ADDRESS 1
#define ARG_TYPE_STRING 2 #define ARG_TYPE_STRING 2
#define ARG_TYPE_SERVICE 3 #define ARG_TYPE_SERVICE 3
#define ARG_TYPE_SERVER 4
#define ARG_TYPE_DBUSERS 5
/** /**
* The subcommand structure * The subcommand structure
* *
@ -91,16 +95,16 @@ struct subcommand showoptions[] = {
{0, 0, 0} }, {0, 0, 0} },
{ "dcb", 1, dprintDCB, "Show a single descriptor control block e.g. show dcb 0x493340", { "dcb", 1, dprintDCB, "Show a single descriptor control block e.g. show dcb 0x493340",
{ARG_TYPE_ADDRESS, 0, 0} }, {ARG_TYPE_ADDRESS, 0, 0} },
{ "dbusers", 1, dcb_usersPrint, "Show statistics and user names for a service's user table.\n\t\tExample : show dbusers <ptr of 'User's data' from services list>", { "dbusers", 1, dcb_usersPrint, "Show statistics and user names for a service's user table.\n\t\tExample : show dbusers <ptr of 'User's data' from services list>|<service name>",
{ARG_TYPE_ADDRESS, 0, 0} }, {ARG_TYPE_DBUSERS, 0, 0} },
{ "epoll", 0, dprintPollStats, "Show the poll statistics", { "epoll", 0, dprintPollStats, "Show the poll statistics",
{0, 0, 0} }, {0, 0, 0} },
{ "modules", 0, dprintAllModules, "Show all currently loaded modules", { "modules", 0, dprintAllModules, "Show all currently loaded modules",
{0, 0, 0} }, {0, 0, 0} },
{ "monitors", 0, monitorShowAll, "Show the monitors that are configured", { "monitors", 0, monitorShowAll, "Show the monitors that are configured",
{0, 0, 0} }, {0, 0, 0} },
{ "server", 1, dprintServer, "Show details for a server, e.g. show server 0x485390", { "server", 1, dprintServer, "Show details for a server, e.g. show server 0x485390. The address may also be repalced with the server name form the configuration file",
{ARG_TYPE_ADDRESS, 0, 0} }, {ARG_TYPE_SERVER, 0, 0} },
{ "servers", 0, dprintAllServers, "Show all configured servers", { "servers", 0, dprintAllServers, "Show all configured servers",
{0, 0, 0} }, {0, 0, 0} },
{ "services", 0, dprintAllServices, "Show all configured services in MaxScale", { "services", 0, dprintAllServices, "Show all configured services in MaxScale",
@ -143,7 +147,7 @@ struct subcommand shutdownoptions[] = {
"service", "service",
1, 1,
shutdown_service, shutdown_service,
"Shutdown a service, e.g. shutdown service 0x4838320", "Shutdown a service, e.g. shutdown service 0x4838320 or shutdown service \"Sales Database\"",
{ARG_TYPE_SERVICE, 0, 0} {ARG_TYPE_SERVICE, 0, 0}
}, },
{ {
@ -176,7 +180,7 @@ static void set_server(DCB *dcb, SERVER *server, char *bit);
*/ */
struct subcommand setoptions[] = { struct subcommand setoptions[] = {
{ "server", 2, set_server, "Set the status of a server. E.g. set server 0x4838320 master", { "server", 2, set_server, "Set the status of a server. E.g. set server 0x4838320 master",
{ARG_TYPE_ADDRESS, ARG_TYPE_STRING, 0} }, {ARG_TYPE_SERVER, ARG_TYPE_STRING, 0} },
{ NULL, 0, NULL, NULL, { NULL, 0, NULL, NULL,
{0, 0, 0} } {0, 0, 0} }
}; };
@ -187,7 +191,7 @@ static void clear_server(DCB *dcb, SERVER *server, char *bit);
*/ */
struct subcommand clearoptions[] = { struct subcommand clearoptions[] = {
{ "server", 2, clear_server, "Clear the status of a server. E.g. clear server 0x4838320 master", { "server", 2, clear_server, "Clear the status of a server. E.g. clear server 0x4838320 master",
{ARG_TYPE_ADDRESS, ARG_TYPE_STRING, 0} }, {ARG_TYPE_SERVER, ARG_TYPE_STRING, 0} },
{ NULL, 0, NULL, NULL, { NULL, 0, NULL, NULL,
{0, 0, 0} } {0, 0, 0} }
}; };
@ -200,9 +204,9 @@ static void reload_config(DCB *dcb);
*/ */
struct subcommand reloadoptions[] = { struct subcommand reloadoptions[] = {
{ "config", 0, reload_config, "Reload the configuration data for MaxScale.", { "config", 0, reload_config, "Reload the configuration data for MaxScale.",
{ARG_TYPE_ADDRESS, 0, 0} }, {0, 0, 0} },
{ "dbusers", 1, reload_dbusers, "Reload the dbuser data for a service. E.g. reload dbusers 0x849420", { "dbusers", 1, reload_dbusers, "Reload the dbuser data for a service. E.g. reload dbusers 0x849420",
{ARG_TYPE_ADDRESS, 0, 0} }, {ARG_TYPE_DBUSERS, 0, 0} },
{ NULL, 0, NULL, NULL, { NULL, 0, NULL, NULL,
{0, 0, 0} } {0, 0, 0} }
}; };
@ -359,6 +363,7 @@ static unsigned long
convert_arg(char *arg, int arg_type) convert_arg(char *arg, int arg_type)
{ {
unsigned long rval; unsigned long rval;
SERVICE *service;
switch (arg_type) switch (arg_type)
{ {
@ -370,6 +375,20 @@ unsigned long rval;
if ((rval = (unsigned long)strtol(arg, NULL, 0)) == 0) if ((rval = (unsigned long)strtol(arg, NULL, 0)) == 0)
rval = (unsigned long)service_find(arg); rval = (unsigned long)service_find(arg);
return rval; return rval;
case ARG_TYPE_SERVER:
if ((rval = (unsigned long)strtol(arg, NULL, 0)) == 0)
rval = (unsigned long)server_find_by_unique_name(arg);
return rval;
case ARG_TYPE_DBUSERS:
if ((rval = (unsigned long)strtol(arg, NULL, 0)) == 0)
{
service = service_find(arg);
if (service)
return (unsigned long)(service->users);
else
return 0;
}
return rval;
} }
return 0; return 0;
} }