Merge branch 'develop' into MXS-1209
This commit is contained in:
@ -77,6 +77,7 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio
|
||||
static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static json_t* diagnostics_json(const MXS_ROUTER *instance);
|
||||
static void clientReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue,
|
||||
DCB *backend_dcb);
|
||||
static void errorReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *message,
|
||||
@ -157,6 +158,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
||||
freeSession,
|
||||
routeQuery,
|
||||
diagnostics,
|
||||
diagnostics_json,
|
||||
clientReply,
|
||||
errorReply,
|
||||
getCapabilities,
|
||||
@ -802,47 +804,6 @@ routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queu
|
||||
return avro_client_handle_request(router, client, queue);
|
||||
}
|
||||
|
||||
/* Not used
|
||||
static char *event_names[] =
|
||||
{
|
||||
"Invalid", "Start Event V3", "Query Event", "Stop Event", "Rotate Event",
|
||||
"Integer Session Variable", "Load Event", "Slave Event", "Create File Event",
|
||||
"Append Block Event", "Exec Load Event", "Delete File Event",
|
||||
"New Load Event", "Rand Event", "User Variable Event", "Format Description Event",
|
||||
"Transaction ID Event (2 Phase Commit)", "Begin Load Query Event",
|
||||
"Execute Load Query Event", "Table Map Event", "Write Rows Event (v0)",
|
||||
"Update Rows Event (v0)", "Delete Rows Event (v0)", "Write Rows Event (v1)",
|
||||
"Update Rows Event (v1)", "Delete Rows Event (v1)", "Incident Event",
|
||||
"Heartbeat Event", "Ignorable Event", "Rows Query Event", "Write Rows Event (v2)",
|
||||
"Update Rows Event (v2)", "Delete Rows Event (v2)", "GTID Event",
|
||||
"Anonymous GTID Event", "Previous GTIDS Event"
|
||||
};
|
||||
*/
|
||||
|
||||
/* Not used
|
||||
// New MariaDB event numbers starts from 0xa0
|
||||
static char *event_names_mariadb10[] =
|
||||
{
|
||||
"Annotate Rows Event",
|
||||
"Binlog Checkpoint Event",
|
||||
"GTID Event",
|
||||
"GTID List Event"
|
||||
};
|
||||
*/
|
||||
|
||||
/**
|
||||
* Display an entry from the spinlock statistics data
|
||||
*
|
||||
* @param dcb The DCB to print to
|
||||
* @param desc Description of the statistic
|
||||
* @param value The statistic value
|
||||
*/
|
||||
static void
|
||||
spin_reporter(void *dcb, char *desc, int value)
|
||||
{
|
||||
dcb_printf((DCB *) dcb, "\t\t%-35s %d\n", desc, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Display router diagnostics
|
||||
*
|
||||
@ -939,17 +900,6 @@ diagnostics(MXS_ROUTER *router, DCB *dcb)
|
||||
session->gtid.domain, session->gtid.server_id,
|
||||
session->gtid.seq);
|
||||
|
||||
// TODO: Add real value for this
|
||||
//dcb_printf(dcb, "\t\tAvro Transaction ID: %u\n", 0);
|
||||
// TODO: Add real value for this
|
||||
//dcb_printf(dcb, "\t\tAvro N.MaxTransactions: %u\n", 0);
|
||||
|
||||
#if SPINLOCK_PROFILE
|
||||
dcb_printf(dcb, "\tSpinlock statistics (catch_lock):\n");
|
||||
spinlock_stats(&session->catch_lock, spin_reporter, dcb);
|
||||
dcb_printf(dcb, "\tSpinlock statistics (rses_lock):\n");
|
||||
spinlock_stats(&session->file_lock, spin_reporter, dcb);
|
||||
#endif
|
||||
dcb_printf(dcb, "\t\t--------------------\n\n");
|
||||
session = session->next;
|
||||
}
|
||||
@ -957,6 +907,74 @@ diagnostics(MXS_ROUTER *router, DCB *dcb)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Display router diagnostics
|
||||
*
|
||||
* @param instance Instance of the router
|
||||
*/
|
||||
static json_t* diagnostics_json(const MXS_ROUTER *router)
|
||||
{
|
||||
AVRO_INSTANCE *router_inst = (AVRO_INSTANCE *)router;
|
||||
|
||||
json_t* rval = json_object();
|
||||
|
||||
char pathbuf[PATH_MAX + 1];
|
||||
snprintf(pathbuf, sizeof(pathbuf), "%s/%s", router_inst->avrodir, AVRO_PROGRESS_FILE);
|
||||
|
||||
json_object_set_new(rval, "infofile", json_string(pathbuf));
|
||||
json_object_set_new(rval, "avrodir", json_string(router_inst->avrodir));
|
||||
json_object_set_new(rval, "binlogdir", json_string(router_inst->binlogdir));
|
||||
json_object_set_new(rval, "binlog_name", json_string(router_inst->binlog_name));
|
||||
json_object_set_new(rval, "binlog_pos", json_integer(router_inst->current_pos));
|
||||
|
||||
snprintf(pathbuf, sizeof(pathbuf), "%lu-%lu-%lu", router_inst->gtid.domain,
|
||||
router_inst->gtid.server_id, router_inst->gtid.seq);
|
||||
json_object_set_new(rval, "gtid", json_string(pathbuf));
|
||||
json_object_set_new(rval, "gtid_timestamp", json_integer(router_inst->gtid.timestamp));
|
||||
json_object_set_new(rval, "gtid_event_number", json_integer(router_inst->gtid.event_num));
|
||||
json_object_set_new(rval, "clients", json_integer(router_inst->stats.n_clients));
|
||||
|
||||
if (router_inst->clients)
|
||||
{
|
||||
json_t* arr = json_array();
|
||||
spinlock_acquire(&router_inst->lock);
|
||||
|
||||
for (AVRO_CLIENT *session = router_inst->clients; session; session = session->next)
|
||||
{
|
||||
json_t* client = json_object();
|
||||
json_object_set_new(client, "uuid", json_string(session->uuid));
|
||||
json_object_set_new(client, "host", json_string(session->dcb->remote));
|
||||
json_object_set_new(client, "port", json_integer(dcb_get_port(session->dcb)));
|
||||
json_object_set_new(client, "user", json_string(session->dcb->user));
|
||||
json_object_set_new(client, "format", json_string(avro_client_ouput[session->format]));
|
||||
json_object_set_new(client, "state", json_string(avro_client_states[session->state]));
|
||||
json_object_set_new(client, "avrofile", json_string(session->avro_binfile));
|
||||
json_object_set_new(client, "avrofile_last_block",
|
||||
json_integer(session->avro_file.blocks_read));
|
||||
json_object_set_new(client, "avrofile_last_record",
|
||||
json_integer(session->avro_file.records_read));
|
||||
|
||||
if (session->gtid_start.domain > 0 || session->gtid_start.server_id > 0 ||
|
||||
session->gtid_start.seq > 0)
|
||||
{
|
||||
|
||||
snprintf(pathbuf, sizeof(pathbuf), "%lu-%lu-%lu", session->gtid_start.domain,
|
||||
session->gtid_start.server_id, session->gtid_start.seq);
|
||||
json_object_set_new(client, "requested_gtid", json_string(pathbuf));
|
||||
}
|
||||
snprintf(pathbuf, sizeof(pathbuf), "%lu-%lu-%lu", session->gtid.domain,
|
||||
session->gtid.server_id, session->gtid.seq);
|
||||
json_object_set_new(client, "current_gtid", json_string(pathbuf));
|
||||
json_array_append_new(arr, client);
|
||||
}
|
||||
spinlock_release(&router_inst->lock);
|
||||
|
||||
json_object_set_new(rval, "clients", arr);
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Client Reply routine - in this case this is a message from the
|
||||
* master server, It should be sent to the state machine that manages
|
||||
|
@ -96,6 +96,7 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessi
|
||||
static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static json_t* diagnostics_json(const MXS_ROUTER *instance);
|
||||
static void clientReply(MXS_ROUTER *instance,
|
||||
MXS_ROUTER_SESSION *router_session,
|
||||
GWBUF *queue,
|
||||
@ -160,6 +161,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
||||
freeSession,
|
||||
routeQuery,
|
||||
diagnostics,
|
||||
diagnostics_json,
|
||||
clientReply,
|
||||
errorReply,
|
||||
getCapabilities,
|
||||
@ -242,8 +244,8 @@ createInstance(SERVICE *service, char **options)
|
||||
int rc = 0;
|
||||
char task_name[BLRM_TASK_NAME_LEN + 1] = "";
|
||||
|
||||
if (service->credentials.name == NULL ||
|
||||
service->credentials.authdata == NULL)
|
||||
if (!service->credentials.name[0] ||
|
||||
!service->credentials.authdata[0])
|
||||
{
|
||||
MXS_ERROR("%s: Error: Service is missing user credentials."
|
||||
" Add the missing username or passwd parameter to the service.",
|
||||
@ -1734,6 +1736,338 @@ diagnostics(MXS_ROUTER *router, DCB *dcb)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Display router diagnostics
|
||||
*
|
||||
* @param instance Instance of the router
|
||||
*/
|
||||
static json_t* diagnostics_json(const MXS_ROUTER *router)
|
||||
{
|
||||
ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)router;
|
||||
int minno = 0;
|
||||
double min5, min10, min15, min30;
|
||||
char buf[40];
|
||||
struct tm tm;
|
||||
|
||||
json_t* rval = json_object();
|
||||
|
||||
minno = router_inst->stats.minno;
|
||||
min30 = 0.0;
|
||||
min15 = 0.0;
|
||||
min10 = 0.0;
|
||||
min5 = 0.0;
|
||||
for (int j = 0; j < BLR_NSTATS_MINUTES; j++)
|
||||
{
|
||||
minno--;
|
||||
if (minno < 0)
|
||||
{
|
||||
minno += BLR_NSTATS_MINUTES;
|
||||
}
|
||||
min30 += router_inst->stats.minavgs[minno];
|
||||
if (j < 15)
|
||||
{
|
||||
min15 += router_inst->stats.minavgs[minno];
|
||||
}
|
||||
if (j < 10)
|
||||
{
|
||||
min10 += router_inst->stats.minavgs[minno];
|
||||
}
|
||||
if (j < 5)
|
||||
{
|
||||
min5 += router_inst->stats.minavgs[minno];
|
||||
}
|
||||
}
|
||||
min30 /= 30.0;
|
||||
min15 /= 15.0;
|
||||
min10 /= 10.0;
|
||||
min5 /= 5.0;
|
||||
|
||||
/* SSL options */
|
||||
if (router_inst->ssl_enabled)
|
||||
{
|
||||
json_t* obj = json_object();
|
||||
|
||||
json_object_set_new(obj, "ssl_ca_cert", json_string(router_inst->service->dbref->server->server_ssl->ssl_ca_cert));
|
||||
json_object_set_new(obj, "ssl_cert", json_string(router_inst->service->dbref->server->server_ssl->ssl_cert));
|
||||
json_object_set_new(obj, "ssl_key", json_string(router_inst->service->dbref->server->server_ssl->ssl_key));
|
||||
json_object_set_new(obj, "ssl_version", json_string(router_inst->ssl_version ? router_inst->ssl_version : "MAX"));
|
||||
|
||||
json_object_set_new(rval, "master_ssl", obj);
|
||||
}
|
||||
|
||||
/* Binlog Encryption options */
|
||||
if (router_inst->encryption.enabled)
|
||||
{
|
||||
json_t* obj = json_object();
|
||||
|
||||
json_object_set_new(obj, "key", json_string(
|
||||
router_inst->encryption.key_management_filename));
|
||||
json_object_set_new(obj, "algorithm", json_string(
|
||||
blr_get_encryption_algorithm(router_inst->encryption.encryption_algorithm)));
|
||||
json_object_set_new(obj, "key_length",
|
||||
json_integer(8 * router_inst->encryption.key_len));
|
||||
|
||||
json_object_set_new(rval, "master_encryption", obj);
|
||||
}
|
||||
|
||||
json_object_set_new(rval, "master_state", json_string(blrm_states[router_inst->master_state]));
|
||||
|
||||
localtime_r(&router_inst->stats.lastReply, &tm);
|
||||
asctime_r(&tm, buf);
|
||||
|
||||
|
||||
json_object_set_new(rval, "binlogdir", json_string(router_inst->binlogdir));
|
||||
json_object_set_new(rval, "heartbeat", json_integer(router_inst->heartbeat));
|
||||
json_object_set_new(rval, "master_starts", json_integer(router_inst->stats.n_masterstarts));
|
||||
json_object_set_new(rval, "master_reconnects", json_integer(router_inst->stats.n_delayedreconnects));
|
||||
json_object_set_new(rval, "binlog_name", json_string(router_inst->binlog_name));
|
||||
json_object_set_new(rval, "binlog_position", json_integer(router_inst->current_pos));
|
||||
|
||||
if (router_inst->trx_safe)
|
||||
{
|
||||
if (router_inst->pending_transaction.state != BLRM_NO_TRANSACTION)
|
||||
{
|
||||
json_object_set_new(rval, "current_trx_position", json_integer(router_inst->binlog_position));
|
||||
}
|
||||
}
|
||||
|
||||
json_object_set_new(rval, "slaves", json_integer(router_inst->stats.n_slaves));
|
||||
json_object_set_new(rval, "session_events", json_integer(router_inst->stats.n_binlogs_ses));
|
||||
json_object_set_new(rval, "total_events", json_integer(router_inst->stats.n_binlogs));
|
||||
json_object_set_new(rval, "bad_crc_count", json_integer(router_inst->stats.n_badcrc));
|
||||
|
||||
minno = router_inst->stats.minno - 1;
|
||||
if (minno == -1)
|
||||
{
|
||||
minno += BLR_NSTATS_MINUTES;
|
||||
}
|
||||
|
||||
json_object_set_new(rval, "events_0", json_real(router_inst->stats.minavgs[minno]));
|
||||
json_object_set_new(rval, "events_5", json_real(min5));
|
||||
json_object_set_new(rval, "events_10", json_real(min10));
|
||||
json_object_set_new(rval, "events_15", json_real(min15));
|
||||
json_object_set_new(rval, "events_30", json_real(min30));
|
||||
|
||||
json_object_set_new(rval, "fake_events", json_integer(router_inst->stats.n_fakeevents));
|
||||
|
||||
json_object_set_new(rval, "artificial_events", json_integer(router_inst->stats.n_artificial));
|
||||
|
||||
json_object_set_new(rval, "binlog_errors", json_integer(router_inst->stats.n_binlog_errors));
|
||||
json_object_set_new(rval, "binlog_rotates", json_integer(router_inst->stats.n_rotates));
|
||||
json_object_set_new(rval, "heartbeat_events", json_integer(router_inst->stats.n_heartbeats));
|
||||
json_object_set_new(rval, "events_read", json_integer(router_inst->stats.n_reads));
|
||||
json_object_set_new(rval, "residual_packets", json_integer(router_inst->stats.n_residuals));
|
||||
|
||||
double average_packets = router_inst->stats.n_reads != 0 ?
|
||||
((double)router_inst->stats.n_binlogs / router_inst->stats.n_reads) : 0;
|
||||
|
||||
json_object_set_new(rval, "average_events_per_packets", json_real(average_packets));
|
||||
|
||||
spinlock_acquire(&router_inst->lock);
|
||||
if (router_inst->stats.lastReply)
|
||||
{
|
||||
if (buf[strlen(buf) - 1] == '\n')
|
||||
{
|
||||
buf[strlen(buf) - 1] = '\0';
|
||||
}
|
||||
|
||||
json_object_set_new(rval, "latest_event", json_string(buf));
|
||||
|
||||
if (!router_inst->mariadb10_compat)
|
||||
{
|
||||
json_object_set_new(rval, "latest_event_type", json_string(
|
||||
(router_inst->lastEventReceived <= MAX_EVENT_TYPE) ?
|
||||
event_names[router_inst->lastEventReceived] : "unknown"));
|
||||
}
|
||||
else
|
||||
{
|
||||
char *ptr = NULL;
|
||||
if (router_inst->lastEventReceived <= MAX_EVENT_TYPE)
|
||||
{
|
||||
ptr = event_names[router_inst->lastEventReceived];
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Check MariaDB 10 new events */
|
||||
if (router_inst->lastEventReceived >= MARIADB_NEW_EVENTS_BEGIN &&
|
||||
router_inst->lastEventReceived <= MAX_EVENT_TYPE_MARIADB10)
|
||||
{
|
||||
ptr = event_names_mariadb10[(router_inst->lastEventReceived - MARIADB_NEW_EVENTS_BEGIN)];
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
json_object_set_new(rval, "latest_event_type", json_string((ptr != NULL) ? ptr : "unknown"));
|
||||
|
||||
if (router_inst->mariadb_gtid &&
|
||||
router_inst->last_mariadb_gtid[0])
|
||||
{
|
||||
json_object_set_new(rval, "latest_gtid", json_string(router_inst->last_mariadb_gtid));
|
||||
}
|
||||
}
|
||||
|
||||
if (router_inst->lastEventTimestamp)
|
||||
{
|
||||
time_t last_event = (time_t)router_inst->lastEventTimestamp;
|
||||
localtime_r(&last_event, &tm);
|
||||
asctime_r(&tm, buf);
|
||||
if (buf[strlen(buf) - 1] == '\n')
|
||||
{
|
||||
buf[strlen(buf) - 1] = '\0';
|
||||
}
|
||||
|
||||
json_object_set_new(rval, "latest_event_timestamp", json_string(buf));
|
||||
}
|
||||
}
|
||||
spinlock_release(&router_inst->lock);
|
||||
|
||||
json_object_set_new(rval, "active_logs", json_boolean(router_inst->active_logs));
|
||||
json_object_set_new(rval, "reconnect_pending", json_boolean(router_inst->reconnect_pending));
|
||||
|
||||
json_t* ev = json_object();
|
||||
|
||||
for (int i = 0; i <= MAX_EVENT_TYPE; i++)
|
||||
{
|
||||
json_object_set_new(ev, event_names[i], json_integer(router_inst->stats.events[i]));
|
||||
}
|
||||
|
||||
if (router_inst->mariadb10_compat)
|
||||
{
|
||||
/* Display MariaDB 10 new events */
|
||||
for (int i = MARIADB_NEW_EVENTS_BEGIN; i <= MAX_EVENT_TYPE_MARIADB10; i++)
|
||||
{
|
||||
json_object_set_new(ev, event_names_mariadb10[(i - MARIADB_NEW_EVENTS_BEGIN)],
|
||||
json_integer(router_inst->stats.events[i]));
|
||||
}
|
||||
}
|
||||
|
||||
json_object_set_new(rval, "event_types", ev);
|
||||
|
||||
if (router_inst->slaves)
|
||||
{
|
||||
json_t* arr = json_array();
|
||||
spinlock_acquire(&router_inst->lock);
|
||||
|
||||
for (ROUTER_SLAVE *session = router_inst->slaves; session; session = session->next)
|
||||
{
|
||||
json_t* slave = json_object();
|
||||
minno = session->stats.minno;
|
||||
min30 = 0.0;
|
||||
min15 = 0.0;
|
||||
min10 = 0.0;
|
||||
min5 = 0.0;
|
||||
for (int j = 0; j < BLR_NSTATS_MINUTES; j++)
|
||||
{
|
||||
minno--;
|
||||
if (minno < 0)
|
||||
{
|
||||
minno += BLR_NSTATS_MINUTES;
|
||||
}
|
||||
min30 += session->stats.minavgs[minno];
|
||||
if (j < 15)
|
||||
{
|
||||
min15 += session->stats.minavgs[minno];
|
||||
}
|
||||
if (j < 10)
|
||||
{
|
||||
min10 += session->stats.minavgs[minno];
|
||||
}
|
||||
if (j < 5)
|
||||
{
|
||||
min5 += session->stats.minavgs[minno];
|
||||
}
|
||||
}
|
||||
min30 /= 30.0;
|
||||
min15 /= 15.0;
|
||||
min10 /= 10.0;
|
||||
min5 /= 5.0;
|
||||
|
||||
json_object_set_new(rval, "server_id", json_integer(session->serverid));
|
||||
|
||||
if (session->hostname)
|
||||
{
|
||||
json_object_set_new(rval, "hostname", json_string(session->hostname));
|
||||
}
|
||||
if (session->uuid)
|
||||
{
|
||||
json_object_set_new(rval, "uuid", json_string(session->uuid));
|
||||
}
|
||||
|
||||
json_object_set_new(rval, "address", json_string(session->dcb->remote));
|
||||
json_object_set_new(rval, "port", json_integer(dcb_get_port(session->dcb)));
|
||||
json_object_set_new(rval, "user", json_string(session->dcb->user));
|
||||
json_object_set_new(rval, "ssl_enabled", json_boolean(session->dcb->ssl));
|
||||
json_object_set_new(rval, "state", json_string(blrs_states[session->state]));
|
||||
json_object_set_new(rval, "next_sequence", json_integer(session->seqno));
|
||||
json_object_set_new(rval, "binlog_file", json_string(session->binlogfile));
|
||||
json_object_set_new(rval, "binlog_pos", json_integer(session->binlog_pos));
|
||||
json_object_set_new(rval, "crc", json_boolean(!session->nocrc));
|
||||
|
||||
json_object_set_new(rval, "requests", json_integer(session->stats.n_requests));
|
||||
json_object_set_new(rval, "events_sent", json_integer(session->stats.n_events));
|
||||
json_object_set_new(rval, "bytes_sent", json_integer(session->stats.n_bytes));
|
||||
json_object_set_new(rval, "data_bursts", json_integer(session->stats.n_bursts));
|
||||
|
||||
if (router_inst->send_slave_heartbeat)
|
||||
{
|
||||
json_object_set_new(rval, "heartbeat_period", json_integer(session->heartbeat));
|
||||
}
|
||||
|
||||
minno = session->stats.minno - 1;
|
||||
if (minno == -1)
|
||||
{
|
||||
minno += BLR_NSTATS_MINUTES;
|
||||
}
|
||||
|
||||
if (session->lastEventTimestamp
|
||||
&& router_inst->lastEventTimestamp && session->lastEventReceived != HEARTBEAT_EVENT)
|
||||
{
|
||||
unsigned long seconds_behind;
|
||||
time_t session_last_event = (time_t)session->lastEventTimestamp;
|
||||
|
||||
if (router_inst->lastEventTimestamp > session->lastEventTimestamp)
|
||||
{
|
||||
seconds_behind = router_inst->lastEventTimestamp - session->lastEventTimestamp;
|
||||
}
|
||||
else
|
||||
{
|
||||
seconds_behind = 0;
|
||||
}
|
||||
|
||||
localtime_r(&session_last_event, &tm);
|
||||
asctime_r(&tm, buf);
|
||||
trim(buf);
|
||||
json_object_set_new(rval, "last_binlog_event_timestamp", json_string(buf));
|
||||
json_object_set_new(rval, "seconds_behind_master", json_integer(seconds_behind));
|
||||
}
|
||||
|
||||
const char *mode = "connected";
|
||||
|
||||
if (session->state)
|
||||
{
|
||||
if ((session->cstate & CS_WAIT_DATA) == CS_WAIT_DATA)
|
||||
{
|
||||
mode = "wait-for-data";
|
||||
}
|
||||
else
|
||||
{
|
||||
mode = "catchup";
|
||||
}
|
||||
}
|
||||
|
||||
json_object_set_new(slave, "mode", json_string(mode));
|
||||
|
||||
json_array_append_new(arr, slave);
|
||||
}
|
||||
spinlock_release(&router_inst->lock);
|
||||
|
||||
json_object_set_new(rval, "slaves", arr);
|
||||
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Client Reply routine - in this case this is a message from the
|
||||
* master server, It should be sent to the state machine that manages
|
||||
|
@ -97,8 +97,8 @@ int main(int argc, char **argv)
|
||||
|
||||
set_libdir(MXS_STRDUP_A(".."));
|
||||
service = service_alloc("test_service", "binlogrouter");
|
||||
service->credentials.name = MXS_STRDUP_A("foo");
|
||||
service->credentials.authdata = MXS_STRDUP_A("bar");
|
||||
strcpy(service->credentials.name, "foo");
|
||||
strcpy(service->credentials.authdata, "bar");
|
||||
|
||||
{
|
||||
char *lasts;
|
||||
|
@ -49,6 +49,7 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessi
|
||||
static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static int execute(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static json_t* diagnostics_json(const MXS_ROUTER *instance);
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance);
|
||||
|
||||
extern int execute_cmd(CLI_SESSION *cli);
|
||||
@ -78,6 +79,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
||||
freeSession,
|
||||
execute,
|
||||
diagnostics,
|
||||
diagnostics_json,
|
||||
NULL,
|
||||
NULL,
|
||||
getCapabilities,
|
||||
@ -290,6 +292,11 @@ diagnostics(MXS_ROUTER *instance, DCB *dcb)
|
||||
return; /* Nothing to do currently */
|
||||
}
|
||||
|
||||
static json_t* diagnostics_json(const MXS_ROUTER *instance)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static uint64_t getCapabilities(MXS_ROUTER *instance)
|
||||
{
|
||||
return RCAP_TYPE_NONE;
|
||||
|
@ -48,6 +48,7 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessi
|
||||
static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static int execute(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static json_t* diagnostics_json(const MXS_ROUTER *instance);
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance);
|
||||
|
||||
extern int execute_cmd(CLI_SESSION *cli);
|
||||
@ -77,6 +78,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
||||
freeSession,
|
||||
execute,
|
||||
diagnostics,
|
||||
diagnostics_json,
|
||||
NULL,
|
||||
NULL,
|
||||
getCapabilities,
|
||||
@ -293,6 +295,17 @@ diagnostics(MXS_ROUTER *instance, DCB *dcb)
|
||||
return; /* Nothing to do currently */
|
||||
}
|
||||
|
||||
/**
|
||||
* Display router diagnostics
|
||||
*
|
||||
* @param instance Instance of the router
|
||||
* @param dcb DCB to send diagnostics to
|
||||
*/
|
||||
static json_t* diagnostics_json(const MXS_ROUTER *instance)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance)
|
||||
{
|
||||
return RCAP_TYPE_NONE;
|
||||
|
@ -1513,6 +1513,34 @@ static void alterMonitor(DCB *dcb, MXS_MONITOR *monitor, char *v1, char *v2, cha
|
||||
|
||||
}
|
||||
|
||||
static void alterService(DCB *dcb, SERVICE *service, char *v1, char *v2, char *v3,
|
||||
char *v4, char *v5, char *v6, char *v7, char *v8, char *v9,
|
||||
char *v10, char *v11)
|
||||
{
|
||||
char *values[11] = {v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11};
|
||||
const int items = sizeof(values) / sizeof(values[0]);
|
||||
|
||||
for (int i = 0; i < items && values[i]; i++)
|
||||
{
|
||||
char *key = values[i];
|
||||
char *value = strchr(key, '=');
|
||||
|
||||
if (value)
|
||||
{
|
||||
*value++ = '\0';
|
||||
|
||||
if (!runtime_alter_service(service, key, value))
|
||||
{
|
||||
dcb_printf(dcb, "Error: Bad key-value parameter: %s=%s\n", key, value);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
dcb_printf(dcb, "Error: not a key-value parameter: %s\n", values[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct subcommand alteroptions[] =
|
||||
{
|
||||
{
|
||||
@ -1574,6 +1602,37 @@ struct subcommand alteroptions[] =
|
||||
ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING
|
||||
}
|
||||
},
|
||||
{
|
||||
"service", 2, 12, alterService,
|
||||
"Alter service parameters",
|
||||
"Usage: alter service NAME KEY=VALUE ...\n"
|
||||
"\n"
|
||||
"Parameters:\n"
|
||||
"NAME Service name\n"
|
||||
"KEY=VALUE List of `key=value` pairs separated by spaces\n"
|
||||
"\n"
|
||||
"All services support the following values for KEY:\n"
|
||||
"user Username used when connecting to servers\n"
|
||||
"password Password used when connecting to servers\n"
|
||||
"enable_root_user Allow root user access through this service\n"
|
||||
"max_retry_interval Maximum restart retry interval\n"
|
||||
"max_connections Maximum connection limit\n"
|
||||
"connection_timeout Client idle timeout in seconds\n"
|
||||
"auth_all_servers Retrieve authentication data from all servers\n"
|
||||
"strip_db_esc Strip escape characters from database names\n"
|
||||
"localhost_match_wildcard_host Match wildcard host to 'localhost' address\n"
|
||||
"version_string The version string given to client connections\n"
|
||||
"weightby Weighting parameter name\n"
|
||||
"log_auth_warnings Log authentication warnings\n"
|
||||
"retry_on_failure Retry service start on failure\n"
|
||||
"\n"
|
||||
"Example: alter service my-service user=maxuser password=maxpwd",
|
||||
{
|
||||
ARG_TYPE_SERVICE, ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING,
|
||||
ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING,
|
||||
ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING, ARG_TYPE_STRING
|
||||
}
|
||||
},
|
||||
{
|
||||
EMPTY_OPTION
|
||||
}
|
||||
|
@ -163,6 +163,33 @@ void HintRouter::diagnostics(DCB* pOut)
|
||||
dcb_printf(pOut, "\tQueries routed to all servers: %d\n", m_routed_to_all);
|
||||
}
|
||||
|
||||
json_t* HintRouter::diagnostics_json() const
|
||||
{
|
||||
HR_ENTRY();
|
||||
|
||||
json_t* rval = json_object();
|
||||
json_t* arr = json_array();
|
||||
|
||||
for (int i = 0; default_action_values[i].name; i++)
|
||||
{
|
||||
if (default_action_values[i].enum_value == (uint64_t)m_default_action)
|
||||
{
|
||||
json_array_append_new(arr, json_string(default_action_values[i].name));
|
||||
}
|
||||
}
|
||||
|
||||
json_object_set_new(rval, "default_action", arr);
|
||||
json_object_set_new(rval, "default_server", json_string(m_default_server.c_str()));
|
||||
json_object_set_new(rval, "max_slave_connections", json_integer(m_max_slaves));
|
||||
json_object_set_new(rval, "total_slave_connections", json_integer(m_total_slave_conns));
|
||||
json_object_set_new(rval, "route_master", json_integer(m_routed_to_master));
|
||||
json_object_set_new(rval, "route_slave", json_integer(m_routed_to_slave));
|
||||
json_object_set_new(rval, "route_named_server", json_integer(m_routed_to_named));
|
||||
json_object_set_new(rval, "route_all", json_integer(m_routed_to_all));
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
Dcb HintRouter::connect_to_backend(MXS_SESSION* session, SERVER_REF* sref,
|
||||
HintRouterSession::BackendMap* all_backends)
|
||||
{
|
||||
|
@ -23,6 +23,7 @@ public:
|
||||
static HintRouter* create(SERVICE* pService, char** pzOptions);
|
||||
HintRouterSession* newSession(MXS_SESSION *pSession);
|
||||
void diagnostics(DCB* pOut);
|
||||
json_t* diagnostics_json() const;
|
||||
uint64_t getCapabilities() const
|
||||
{
|
||||
return RCAP_TYPE_NONE;
|
||||
|
@ -70,6 +70,7 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessi
|
||||
static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static int execute(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static json_t* diagnostics_json(const MXS_ROUTER *instance);
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance);
|
||||
static void handleError(MXS_ROUTER *instance,
|
||||
MXS_ROUTER_SESSION *router_session,
|
||||
@ -103,6 +104,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
||||
freeSession,
|
||||
execute,
|
||||
diagnostics,
|
||||
diagnostics_json,
|
||||
NULL,
|
||||
handleError,
|
||||
getCapabilities,
|
||||
@ -374,6 +376,18 @@ diagnostics(MXS_ROUTER *instance, DCB *dcb)
|
||||
return; /* Nothing to do currently */
|
||||
}
|
||||
|
||||
/**
|
||||
* Display router diagnostics
|
||||
*
|
||||
* @param instance Instance of the router
|
||||
* @param dcb DCB to send diagnostics to
|
||||
*/
|
||||
static json_t*
|
||||
diagnostics_json(const MXS_ROUTER *instance)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* Capabilities interface for the rotuer
|
||||
*
|
||||
|
@ -94,6 +94,7 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio
|
||||
static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static json_t* diagnostics_json(const MXS_ROUTER *instance);
|
||||
static void clientReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue,
|
||||
DCB *backend_dcb);
|
||||
static void handleError(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *errbuf,
|
||||
@ -123,6 +124,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
||||
freeSession,
|
||||
routeQuery,
|
||||
diagnostics,
|
||||
diagnostics_json,
|
||||
clientReply,
|
||||
handleError,
|
||||
getCapabilities,
|
||||
@ -619,7 +621,7 @@ static void
|
||||
diagnostics(MXS_ROUTER *router, DCB *dcb)
|
||||
{
|
||||
ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *) router;
|
||||
char *weightby;
|
||||
const char *weightby = serviceGetWeightingParameter(router_inst->service);
|
||||
|
||||
dcb_printf(dcb, "\tNumber of router sessions: %d\n",
|
||||
router_inst->stats.n_sessions);
|
||||
@ -627,8 +629,7 @@ diagnostics(MXS_ROUTER *router, DCB *dcb)
|
||||
router_inst->service->stats.n_current);
|
||||
dcb_printf(dcb, "\tNumber of queries forwarded: %d\n",
|
||||
router_inst->stats.n_queries);
|
||||
if ((weightby = serviceGetWeightingParameter(router_inst->service))
|
||||
!= NULL)
|
||||
if (*weightby)
|
||||
{
|
||||
dcb_printf(dcb, "\tConnection distribution based on %s "
|
||||
"server parameter.\n",
|
||||
@ -645,6 +646,31 @@ diagnostics(MXS_ROUTER *router, DCB *dcb)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Display router diagnostics
|
||||
*
|
||||
* @param instance Instance of the router
|
||||
* @param dcb DCB to send diagnostics to
|
||||
*/
|
||||
static json_t* diagnostics_json(const MXS_ROUTER *router)
|
||||
{
|
||||
ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)router;
|
||||
json_t* rval = json_object();
|
||||
|
||||
json_object_set_new(rval, "connections", json_integer(router_inst->stats.n_sessions));
|
||||
json_object_set_new(rval, "current_connections", json_integer(router_inst->service->stats.n_current));
|
||||
json_object_set_new(rval, "queries", json_integer(router_inst->stats.n_queries));
|
||||
|
||||
const char *weightby = serviceGetWeightingParameter(router_inst->service);
|
||||
|
||||
if (*weightby)
|
||||
{
|
||||
json_object_set_new(rval, "weightby", json_string(weightby));
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Client Reply routine
|
||||
*
|
||||
|
@ -75,6 +75,7 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session);
|
||||
static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session);
|
||||
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static json_t* diagnostics_json(const MXS_ROUTER *instance);
|
||||
static void clientReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue,
|
||||
DCB *backend_dcb);
|
||||
static void handleError(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
|
||||
@ -151,6 +152,7 @@ MXS_MODULE *MXS_CREATE_MODULE()
|
||||
freeSession,
|
||||
routeQuery,
|
||||
diagnostics,
|
||||
diagnostics_json,
|
||||
clientReply,
|
||||
handleError,
|
||||
getCapabilities,
|
||||
@ -649,7 +651,7 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb)
|
||||
{
|
||||
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
|
||||
char *weightby;
|
||||
const char *weightby = serviceGetWeightingParameter(router->service);
|
||||
double master_pct = 0.0, slave_pct = 0.0, all_pct = 0.0;
|
||||
|
||||
dcb_printf(dcb, "\n");
|
||||
@ -693,7 +695,7 @@ static void diagnostics(MXS_ROUTER *instance, DCB *dcb)
|
||||
dcb_printf(dcb, "\tNumber of queries forwarded to all: %" PRIu64 " (%.2f%%)\n",
|
||||
router->stats.n_all, all_pct);
|
||||
|
||||
if ((weightby = serviceGetWeightingParameter(router->service)) != NULL)
|
||||
if (*weightby)
|
||||
{
|
||||
dcb_printf(dcb, "\tConnection distribution based on %s "
|
||||
"server parameter.\n",
|
||||
@ -711,6 +713,57 @@ static void diagnostics(MXS_ROUTER *instance, DCB *dcb)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Diagnostics routine (API)
|
||||
*
|
||||
* Print query router statistics to the DCB passed in
|
||||
*
|
||||
* @param instance The router instance
|
||||
* @param dcb The DCB for diagnostic output
|
||||
*/
|
||||
static json_t* diagnostics_json(const MXS_ROUTER *instance)
|
||||
{
|
||||
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
|
||||
|
||||
json_t* rval = json_object();
|
||||
|
||||
json_object_set_new(rval, "use_sql_variables_in",
|
||||
json_string(mxs_target_to_str(router->rwsplit_config.use_sql_variables_in)));
|
||||
json_object_set_new(rval, "slave_selection_criteria",
|
||||
json_string(select_criteria_to_str(router->rwsplit_config.slave_selection_criteria)));
|
||||
json_object_set_new(rval, "master_failure_mode",
|
||||
json_string(failure_mode_to_str(router->rwsplit_config.master_failure_mode)));
|
||||
json_object_set_new(rval, "max_slave_replication_lag",
|
||||
json_integer(router->rwsplit_config.max_slave_replication_lag));
|
||||
json_object_set_new(rval, "retry_failed_reads",
|
||||
json_boolean(router->rwsplit_config.retry_failed_reads));
|
||||
json_object_set_new(rval, "strict_multi_stmt",
|
||||
json_boolean(router->rwsplit_config.strict_multi_stmt));
|
||||
json_object_set_new(rval, "disable_sescmd_history",
|
||||
json_boolean(router->rwsplit_config.disable_sescmd_history));
|
||||
json_object_set_new(rval, "max_sescmd_history",
|
||||
json_integer(router->rwsplit_config.max_sescmd_history));
|
||||
json_object_set_new(rval, "master_accept_reads",
|
||||
json_boolean(router->rwsplit_config.master_accept_reads));
|
||||
|
||||
|
||||
json_object_set_new(rval, "connections", json_integer(router->stats.n_sessions));
|
||||
json_object_set_new(rval, "current_connections", json_integer(router->service->stats.n_current));
|
||||
json_object_set_new(rval, "queries", json_integer(router->stats.n_queries));
|
||||
json_object_set_new(rval, "route_master", json_integer(router->stats.n_master));
|
||||
json_object_set_new(rval, "route_slave", json_integer(router->stats.n_slave));
|
||||
json_object_set_new(rval, "route_all", json_integer(router->stats.n_all));
|
||||
|
||||
const char *weightby = serviceGetWeightingParameter(router->service);
|
||||
|
||||
if (*weightby)
|
||||
{
|
||||
json_object_set_new(rval, "weightby", json_string(weightby));
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Check if we have received a complete reply from the backend
|
||||
*
|
||||
|
@ -333,6 +333,32 @@ void SchemaRouter::diagnostics(DCB* dcb)
|
||||
dcb_printf(dcb, "\n");
|
||||
}
|
||||
|
||||
json_t* SchemaRouter::diagnostics_json() const
|
||||
{
|
||||
double sescmd_pct = m_stats.n_sescmd != 0 ?
|
||||
100.0 * ((double)m_stats.n_sescmd / (double)m_stats.n_queries) :
|
||||
0.0;
|
||||
|
||||
json_t* rval = json_object();
|
||||
json_object_set_new(rval, "queries", json_integer(m_stats.n_queries));
|
||||
json_object_set_new(rval, "sescmd_percentage", json_real(sescmd_pct));
|
||||
json_object_set_new(rval, "longest_sescmd_chain", json_integer(m_stats.longest_sescmd));
|
||||
json_object_set_new(rval, "times_sescmd_limit_exceeded", json_integer(m_stats.n_hist_exceeded));
|
||||
|
||||
/** Session time statistics */
|
||||
if (m_stats.sessions > 0)
|
||||
{
|
||||
json_object_set_new(rval, "longest_session", json_real(m_stats.ses_longest));
|
||||
json_object_set_new(rval, "shortest_session", json_real(m_stats.ses_shortest));
|
||||
json_object_set_new(rval, "average_session", json_real(m_stats.ses_average));
|
||||
}
|
||||
|
||||
json_object_set_new(rval, "shard_map_hits", json_integer(m_stats.shmap_cache_hit));
|
||||
json_object_set_new(rval, "shard_map_misses", json_integer(m_stats.shmap_cache_miss));
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
uint64_t SchemaRouter::getCapabilities()
|
||||
{
|
||||
return RCAP_TYPE_NONE;
|
||||
|
@ -38,6 +38,7 @@ public:
|
||||
static SchemaRouter* create(SERVICE* pService, char** pzOptions);
|
||||
SchemaRouterSession* newSession(MXS_SESSION* pSession);
|
||||
void diagnostics(DCB* pDcb);
|
||||
json_t* diagnostics_json() const;
|
||||
uint64_t getCapabilities();
|
||||
|
||||
private:
|
||||
|
Reference in New Issue
Block a user