Merge branch 'develop' into MAX-324

This commit is contained in:
Markus Makela
2015-02-16 14:58:35 +02:00
38 changed files with 32195 additions and 1388 deletions

View File

@ -18,39 +18,48 @@
/**
* @file fwfilter.c
* Firewall Filter
* @author Markus Mäkelä
* @date 13.2.2015
* @version 1.0.0
* @copyright GPLv2
* @section secDesc Firewall Filter
*
* A filter that acts as a firewall, denying queries that do not meet a set of rules.
*
* Filter configuration parameters:
*
*@code{.unparsed}
* rules=<path to file> Location of the rule file
*
*@endcode
* Rules are defined in a separate rule file that lists all the rules and the users to whom the rules are applied.
* Rules follow a simple syntax that denies the queries that meet the requirements of the rules.
* For example, to define a rule denying users from accessing the column 'salary' between
* the times 15:00 and 17:00, the following rule is to be configured into the configuration file:
*
*@code{.unparsed}
* rule block_salary deny columns salary at_times 15:00:00-17:00:00
*
*@endcode
* The users are matched by username and network address. Wildcard values can be provided by using the '%' character.
* For example, to apply this rule to users John, connecting from any address
* that starts with the octets 198.168.%, and Jane, connecting from the address 192.168.0.1:
*
*@code{.unparsed}
* users John@192.168.% Jane@192.168.0.1 match any rules block_salary
*@endcode
*
*
* The 'match' keyword controls the way rules are matched. If it is set to 'any' the first active rule that is triggered will cause the query to be denied.
* The 'match' keyword controls the way rules are matched. If it is set to
* 'any' the first active rule that is triggered will cause the query to be denied.
* If it is set to 'all' all the active rules need to match before the query is denied.
*
* Rule syntax
*
* @subsection secRule Rule syntax
* This is the syntax used when defining rules.
*@code{.unparsed}
* rule NAME deny [wildcard | columns VALUE ... | regex REGEX | limit_queries COUNT TIMEPERIOD HOLDOFF | no_where_clause] [at_times VALUE...] [on_queries [select|update|insert|delete]]
*
* User syntax
*
*@endcode
* @subsection secUser User syntax
* This is the syntax used when linking users to rules. It takes one or more
* combinations of username and network, either the value any or all,
* depending on how you want to match the rules, and one or more rule names.
*@code{.unparsed}
* users NAME ... match [any|all] rules RULE ...
*
*@endcode
*/
#include <my_config.h>
#include <stdint.h>
@ -109,13 +118,13 @@ static FILTER_OBJECT MyObject = {
* Rule types
*/
typedef enum {
RT_UNDEFINED = 0x00,
RT_COLUMN,
RT_THROTTLE,
RT_PERMISSION,
RT_WILDCARD,
RT_REGEX,
RT_CLAUSE
RT_UNDEFINED = 0x00, /*< Undefined rule */
RT_COLUMN, /*< Column name rule*/
RT_THROTTLE, /*< Query speed rule */
RT_PERMISSION, /*< Simple denying rule */
RT_WILDCARD, /*< Wildcard denial rule */
RT_REGEX, /*< Regex matching rule */
RT_CLAUSE /*< WHERE-clause requirement rule */
}ruletype_t;
const char* rule_names[] = {
@ -133,25 +142,31 @@ const char* rule_names[] = {
* Linked list of strings.
*/
typedef struct strlink_t{
struct strlink_t *next;
char* value;
struct strlink_t *next; /*< Next node in the list */
char* value; /*< Value of the current node */
}STRLINK;
/**
* A structure defining a range of time
*/
typedef struct timerange_t{
struct timerange_t* next;
struct tm start;
struct tm end;
struct timerange_t* next; /*< Next node in the list */
struct tm start; /*< Start of the time range */
struct tm end; /*< End of the time range */
}TIMERANGE;
/**
* Query speed measurement and limitation structure
*/
typedef struct queryspeed_t{
time_t first_query;
time_t triggered;
double period;
double cooldown;
int count;
int limit;
long id;
struct queryspeed_t* next;
time_t first_query; /*< Time when the first query occurred */
time_t triggered; /*< Time when the limit was exceeded */
double period; /*< Measurement interval in seconds */
double cooldown;/*< Time the user is denied access for */
int count; /*< Number of queries done */
int limit; /*< Maximum number of queries */
long id; /*< Unique id of the rule */
struct queryspeed_t* next; /*< Next node in the list */
}QUERYSPEED;
@ -162,65 +177,70 @@ typedef struct queryspeed_t{
* This allows to match an arbitrary set of rules against a user.
*/
typedef struct rule_t{
void* data;
char* name;
ruletype_t type;
skygw_query_op_t on_queries;
bool allow;
int times_matched;
TIMERANGE* active;
void* data; /*< Actual implementation of the rule */
char* name; /*< Name of the rule */
ruletype_t type;/*< Type of the rule */
skygw_query_op_t on_queries;/*< Types of queries to inspect */
bool allow;/*< Allow or deny the query if this rule matches */
int times_matched;/*< Number of times this rule has been matched */
TIMERANGE* active;/*< List of times when this rule is active */
}RULE;
/**
* Linked list of pointers to a global pool of RULE structs
*/
typedef struct rulelist_t{
RULE* rule;
struct rulelist_t* next;
RULE* rule; /*< The rule structure */
struct rulelist_t* next;/*< Next node in the list */
}RULELIST;
typedef struct user_t{
char* name;
SPINLOCK* lock;
QUERYSPEED* qs_limit;
RULELIST* rules_or;
RULELIST* rules_and;
char* name;/*< Name of the user */
SPINLOCK* lock;/*< User spinlock */
QUERYSPEED* qs_limit;/*< The query speed structure unique to this user */
RULELIST* rules_or;/*< If any of these rules match the action is triggered */
RULELIST* rules_and;/*< All of these rules must match for the action to trigger */
}USER;
/**
* Linked list of IP adresses and subnet masks
*/
typedef struct iprange_t{
struct iprange_t* next;
uint32_t ip;
uint32_t mask;
struct iprange_t* next;/*< Next node in the list */
uint32_t ip;/*< IP address */
uint32_t mask;/*< Network mask */
}IPRANGE;
/**
* The Firewall filter instance.
*/
typedef struct {
HASHTABLE* htable;
RULELIST* rules;
STRLINK* userstrings;
bool def_op;
SPINLOCK* lock;
long idgen; /**UID generator*/
HASHTABLE* htable; /*< User hashtable */
RULELIST* rules;/*< List of all the rules */
STRLINK* userstrings;/*< Temporary list of raw strings of users */
bool def_op;/*< Default operation mode, defaults to deny */
SPINLOCK* lock;/*< Instance spinlock */
long idgen; /*< UID generator */
} FW_INSTANCE;
/**
* The session structure for Firewall filter.
*/
typedef struct {
SESSION* session;
char* errmsg;
DOWNSTREAM down;
UPSTREAM up;
SESSION* session;/*< Client session structure */
char* errmsg;/*< Rule specific error message */
DOWNSTREAM down;/*< Next object in the downstream chain */
UPSTREAM up;/*< Next object in the upstream chain */
} FW_SESSION;
static int hashkeyfun(void* key);
static int hashcmpfun (void *, void *);
/**
* Hashtable key hashing function. Uses a simple string hashing algorithm.
* @param key Key to hash
* @return The hash value of the key
*/
static int hashkeyfun(
void* key)
{
@ -232,9 +252,16 @@ static int hashkeyfun(
while((c = *ptr++)){
hash = c + (hash << 6) + (hash << 16) - hash;
}
return (int)hash > 0 ? hash : -hash;
return hash;
}
/**
* Hashtable entry comparison function. Does a string matching operation on the
* two keys. This function assumes the values are pointers to null-terminated
* character arrays.
* @param v1 The first key
* @param v2 The second key
* @return Zero if the values are equal. Non-zero in other cases.
*/
static int hashcmpfun(
void* v1,
void* v2)
@ -245,19 +272,6 @@ static int hashcmpfun(
return strcmp(i1,i2);
}
static void* hstrdup(void* fval)
{
char* str = (char*)fval;
return strdup(str);
}
static void* hstrfree(void* fval)
{
free (fval);
return NULL;
}
void* rlistdup(void* fval)
{
@ -588,7 +602,7 @@ RULE* find_rule(char* tok, FW_INSTANCE* instance)
}
rlist = rlist->next;
}
skygw_log_write(LOGFILE_ERROR, "fwfilter: Rule not found: %s",tok);
skygw_log_write(LOGFILE_ERROR, "Error : Rule not found: %s",tok);
return NULL;
}
@ -602,6 +616,10 @@ void add_users(char* rule, FW_INSTANCE* instance)
assert(rule != NULL && instance != NULL);
STRLINK* link = calloc(1,sizeof(STRLINK));
if(link == NULL){
skygw_log_write(LOGFILE_ERROR,"Error : Memory allocation failed");
return;
}
link->next = instance->userstrings;
link->value = strdup(rule);
instance->userstrings = link;
@ -619,8 +637,9 @@ void link_rules(char* rule, FW_INSTANCE* instance)
/**Apply rules to users*/
bool match_any;
bool match_any = true;
char *tok, *ruleptr, *userptr, *modeptr;
char *saveptr = NULL;
RULELIST* rulelist = NULL;
userptr = strstr(rule,"users ");
@ -635,10 +654,10 @@ void link_rules(char* rule, FW_INSTANCE* instance)
*modeptr++ = '\0';
*ruleptr++ = '\0';
tok = strtok(modeptr," ");
if(strcmp(tok,"match") == 0){
tok = strtok(NULL," ");
tok = strtok_r(modeptr," ",&saveptr);
if(tok && strcmp(tok,"match") == 0){
tok = strtok_r(NULL," ",&saveptr);
if(strcmp(tok,"any") == 0){
match_any = true;
}else if(strcmp(tok,"all") == 0){
@ -649,8 +668,8 @@ void link_rules(char* rule, FW_INSTANCE* instance)
}
}
tok = strtok(ruleptr," ");
tok = strtok(NULL," ");
tok = strtok_r(ruleptr," ",&saveptr);
tok = strtok_r(NULL," ",&saveptr);
while(tok)
{
@ -664,7 +683,7 @@ void link_rules(char* rule, FW_INSTANCE* instance)
rulelist = tmp_rl;
}
tok = strtok(NULL," ");
tok = strtok_r(NULL," ",&saveptr);
}
/**
@ -672,8 +691,8 @@ void link_rules(char* rule, FW_INSTANCE* instance)
*/
*(ruleptr) = '\0';
userptr = strtok(rule," ");
userptr = strtok(NULL," ");
userptr = strtok_r(rule," ",&saveptr);
userptr = strtok_r(NULL," ",&saveptr);
while(userptr)
{
@ -720,10 +739,16 @@ void link_rules(char* rule, FW_INSTANCE* instance)
(void *)userptr,
(void *)user);
userptr = strtok(NULL," ");
userptr = strtok_r(NULL," ",&saveptr);
}
while(rulelist)
{
RULELIST *tmp = rulelist;
rulelist = rulelist->next;
free(tmp);
}
}
@ -737,7 +762,8 @@ void parse_rule(char* rule, FW_INSTANCE* instance)
ss_dassert(rule != NULL && instance != NULL);
char *rulecpy = strdup(rule);
char *tok = strtok(rulecpy," ,");
char *saveptr = NULL;
char *tok = strtok_r(rulecpy," ,",&saveptr);
bool allow,deny,mode;
RULE* ruledef = NULL;
@ -745,7 +771,7 @@ void parse_rule(char* rule, FW_INSTANCE* instance)
if(strcmp("rule",tok) == 0){ /**Define a new rule*/
tok = strtok(NULL," ,");
tok = strtok_r(NULL," ,",&saveptr);
if(tok == NULL) goto retblock;
@ -780,8 +806,13 @@ void parse_rule(char* rule, FW_INSTANCE* instance)
add_users(rule, instance);
goto retblock;
}
else
{
skygw_log_write(LOGFILE_ERROR,"Error : Unknown token in rule file: %s",tok);
goto retblock;
}
tok = strtok(NULL, " ,");
tok = strtok_r(NULL, " ,",&saveptr);
if((allow = (strcmp(tok,"allow") == 0)) ||
@ -790,7 +821,7 @@ void parse_rule(char* rule, FW_INSTANCE* instance)
mode = allow ? true:false;
ruledef->allow = mode;
ruledef->type = RT_PERMISSION;
tok = strtok(NULL, " ,");
tok = strtok_r(NULL, " ,",&saveptr);
while(tok){
@ -802,13 +833,13 @@ void parse_rule(char* rule, FW_INSTANCE* instance)
{
STRLINK *tail = NULL,*current;
ruledef->type = RT_COLUMN;
tok = strtok(NULL, " ,");
tok = strtok_r(NULL, " ,",&saveptr);
while(tok && strcmp(tok,"at_times") != 0){
current = malloc(sizeof(STRLINK));
current->value = strdup(tok);
current->next = tail;
tail = current;
tok = strtok(NULL, " ,");
tok = strtok_r(NULL, " ,",&saveptr);
}
ruledef->data = (void*)tail;
@ -818,7 +849,7 @@ void parse_rule(char* rule, FW_INSTANCE* instance)
else if(strcmp(tok,"at_times") == 0)
{
tok = strtok(NULL, " ,");
tok = strtok_r(NULL, " ,",&saveptr);
TIMERANGE *tr = NULL;
while(tok){
TIMERANGE *tmp = parse_time(tok,instance);
@ -828,7 +859,7 @@ void parse_rule(char* rule, FW_INSTANCE* instance)
}
tmp->next = tr;
tr = tmp;
tok = strtok(NULL, " ,");
tok = strtok_r(NULL, " ,",&saveptr);
}
ruledef->active = tr;
}
@ -837,21 +868,22 @@ void parse_rule(char* rule, FW_INSTANCE* instance)
bool escaped = false;
regex_t *re;
char* start, *str;
tok = strtok(NULL," ");
tok = strtok_r(NULL," ",&saveptr);
char delim = '\'';
while(*tok == '\'' || *tok == '"'){
delim = *tok;
tok++;
}
start = tok;
while(isspace(*tok) || *tok == '\'' || *tok == '"'){
while(isspace(*tok) || *tok == delim){
tok++;
}
while(true){
if((*tok == '\'' || *tok == '"') && !escaped){
if((*tok == delim) && !escaped){
break;
}
escaped = (*tok == '\\');
@ -895,12 +927,24 @@ void parse_rule(char* rule, FW_INSTANCE* instance)
qs->id = ++instance->idgen;
spinlock_release(instance->lock);
tok = strtok(NULL," ");
tok = strtok_r(NULL," ",&saveptr);
if(tok == NULL){
free(qs);
goto retblock;
}
qs->limit = atoi(tok);
tok = strtok(NULL," ");
tok = strtok_r(NULL," ",&saveptr);
if(tok == NULL){
free(qs);
goto retblock;
}
qs->period = atof(tok);
tok = strtok(NULL," ");
tok = strtok_r(NULL," ",&saveptr);
if(tok == NULL){
free(qs);
goto retblock;
}
qs->cooldown = atof(tok);
ruledef->type = RT_THROTTLE;
ruledef->data = (void*)qs;
@ -912,7 +956,7 @@ void parse_rule(char* rule, FW_INSTANCE* instance)
}
else if(strcmp(tok,"on_operations") == 0)
{
tok = strtok(NULL," ");
tok = strtok_r(NULL," ",&saveptr);
if(!parse_querytypes(tok,ruledef)){
skygw_log_write(LOGFILE_ERROR,
"fwfilter: Invalid query type"
@ -920,7 +964,7 @@ void parse_rule(char* rule, FW_INSTANCE* instance)
,tok);
}
}
tok = strtok(NULL," ,");
tok = strtok_r(NULL," ,",&saveptr);
}
goto retblock;
@ -958,13 +1002,13 @@ createInstance(char **options, FILTER_PARAMETER **params)
spinlock_init(my_instance->lock);
if((ht = hashtable_alloc(7, hashkeyfun, hashcmpfun)) == NULL){
if((ht = hashtable_alloc(100, hashkeyfun, hashcmpfun)) == NULL){
skygw_log_write(LOGFILE_ERROR, "Unable to allocate hashtable.");
free(my_instance);
return NULL;
}
hashtable_memory_fns(ht,hstrdup,NULL,hstrfree,hrulefree);
hashtable_memory_fns(ht,(HASHMEMORYFN)strdup,NULL,(HASHMEMORYFN)free,hrulefree);
my_instance->htable = ht;
my_instance->def_op = true;
@ -979,18 +1023,22 @@ createInstance(char **options, FILTER_PARAMETER **params)
}
}
if(filename == NULL)
{
skygw_log_write(LOGFILE_ERROR, "Unable to find rule file for firewall filter.");
skygw_log_write(LOGFILE_ERROR, "Unable to find rule file for firewall filter. Please provide the path with"
" rules=<path to file>");
hashtable_free(my_instance->htable);
free(my_instance);
return NULL;
}
if((file = fopen(filename,"rb")) == NULL ){
skygw_log_write(LOGFILE_ERROR, "Error while opening rule file for firewall filter.");
free(my_instance);
free(filename);
return NULL;
hashtable_free(my_instance->htable);
free(my_instance);
free(filename);
return NULL;
}
free(filename);
@ -1002,6 +1050,7 @@ createInstance(char **options, FILTER_PARAMETER **params)
if(ferror(file)){
skygw_log_write(LOGFILE_ERROR, "Error while reading rule file for firewall filter.");
fclose(file);
hashtable_free(my_instance->htable);
free(my_instance);
return NULL;
}
@ -1037,9 +1086,7 @@ createInstance(char **options, FILTER_PARAMETER **params)
/**
* Associate a new session with this instance of the filter and opens
* a connection to the server and prepares the exchange and the queue for use.
*
* Associate a new session with this instance of the filter.
*
* @param instance The filter instance data
* @param session The session itself
@ -1370,15 +1417,17 @@ bool rule_matches(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue
spinlock_acquire(user->lock);
queryspeed = user->qs_limit;
spinlock_release(user->lock);
while(queryspeed){
if(queryspeed->id == rule_qs->id){
break;
}
queryspeed = queryspeed->next;
}
if(queryspeed == NULL){
/**No match found*/
@ -1427,7 +1476,7 @@ bool rule_matches(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue
{
queryspeed->first_query = time_now;
}
spinlock_release(user->lock);
break;
case RT_CLAUSE:

View File

@ -11,6 +11,7 @@ add_executable(harness harness_util.c harness_common.c ${CORE})
target_link_libraries(harness_ui fullcore log_manager utils)
target_link_libraries(harness fullcore)
execute_process(COMMAND ${CMAKE_COMMAND} -E copy ${ERRMSG} ${CMAKE_CURRENT_BINARY_DIR})
execute_process(COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/harness.cnf ${CMAKE_CURRENT_BINARY_DIR})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/testdriver.sh ${CMAKE_CURRENT_BINARY_DIR}/testdriver.sh @ONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/hintfilter/hint_testing.cnf ${CMAKE_CURRENT_BINARY_DIR}/hintfilter/hint_testing.cnf)

View File

@ -141,6 +141,7 @@ FILTER_PARAMETER** read_params(int* paramc)
{
char buffer[256];
char* token;
char* saveptr;
char* names[64];
char* values[64];
int pc = 0, do_read = 1, val_len = 0;
@ -157,14 +158,14 @@ FILTER_PARAMETER** read_params(int* paramc)
if(strcmp("done\n",buffer) == 0){
do_read = 0;
}else{
token = strtok(buffer,"=\n");
token = strtok_r(buffer,"=\n",&saveptr);
if(token!=NULL){
val_len = strcspn(token," \n\0");
if((names[pc] = calloc((val_len + 1),sizeof(char))) != NULL){
memcpy(names[pc],token,val_len);
}
}
token = strtok(NULL,"=\n");
token = strtok_r(NULL,"=\n",&saveptr);
if(token!=NULL){
val_len = strcspn(token," \n\0");
if((values[pc] = calloc((val_len + 1),sizeof(char))) != NULL){
@ -997,6 +998,7 @@ int process_opts(int argc, char** argv)
int fd, buffsize = 1024;
int rd,rdsz, rval = 0, error = 0;
size_t fsize;
char* saveptr;
char *buff = calloc(buffsize,sizeof(char)), *tok = NULL;
/**Parse 'harness.cnf' file*/
@ -1027,16 +1029,16 @@ int process_opts(int argc, char** argv)
instance.session_count = 1;
rdsz = read(fd,buff,fsize);
buff[rdsz] = '\0';
tok = strtok(buff,"=");
tok = strtok_r(buff,"=",&saveptr);
while(tok){
if(!strcmp(tok,"threads")){
tok = strtok(NULL,"\n\0");
tok = strtok_r(NULL,"\n\0",&saveptr);
instance.thrcount = strtol(tok,0,0);
}else if(!strcmp(tok,"sessions")){
tok = strtok(NULL,"\n\0");
tok = strtok_r(NULL,"\n\0",&saveptr);
instance.session_count = strtol(tok,0,0);
}
tok = strtok(NULL,"=");
tok = strtok_r(NULL,"=",&saveptr);
}

View File

@ -20,7 +20,6 @@ int main(int argc, char** argv){
printf("\n\n\tFilter Test Harness\n\n");
}
while(instance.running){
printf("Harness> ");
memset(buffer,0,256);

View File

@ -488,8 +488,11 @@ char *ptr;
}
static int
cmp_topn(TOPNQ **a, TOPNQ **b)
cmp_topn(const void *va, const void *vb)
{
TOPNQ **a = (TOPNQ **)va;
TOPNQ **b = (TOPNQ **)vb;
if ((*b)->duration.tv_sec == (*a)->duration.tv_sec)
return (*b)->duration.tv_usec - (*a)->duration.tv_usec;
return (*b)->duration.tv_sec - (*a)->duration.tv_sec;

View File

@ -172,6 +172,7 @@ typedef struct router_slave {
int overrun;
uint32_t rank; /*< Replication rank */
uint8_t seqno; /*< Replication dump sequence no */
uint32_t lastEventTimestamp;/*< Last event timestamp sent */
SPINLOCK catch_lock; /*< Event catchup lock */
unsigned int cstate; /*< Catch up state */
SPINLOCK rses_lock; /*< Protects rses_deleted */
@ -257,6 +258,7 @@ typedef struct router_instance {
SESSION *session; /*< Fake session for master connection */
unsigned int master_state; /*< State of the master FSM */
uint8_t lastEventReceived;
uint32_t lastEventTimestamp; /*< Timestamp from last event */
GWBUF *residual; /*< Any residual binlog event */
MASTER_RESPONSES saved_master; /*< Saved master responses */
char *binlogdir; /*< The directory with the binlog files */

View File

@ -569,7 +569,7 @@ int gw_read_client_event(
int rc = 0;
int nbytes_read = 0;
uint8_t cap = 0;
bool stmt_input; /*< router input type */
bool stmt_input = false; /*< router input type */
CHK_DCB(dcb);
protocol = DCB_PROTOCOL(dcb, MySQLProtocol);

View File

@ -367,6 +367,7 @@ unsigned char *defuuid;
inst->residual = NULL;
inst->slaves = NULL;
inst->next = NULL;
inst->lastEventTimestamp = 0;
/*
* Read any cached response messages
@ -486,6 +487,7 @@ ROUTER_SLAVE *slave;
slave->file = NULL;
strcpy(slave->binlogfile, "unassigned");
slave->connect_time = time(0);
slave->lastEventTimestamp = 0;
/**
* Add this session to the list of active sessions.
@ -777,11 +779,18 @@ struct tm tm;
buf);
dcb_printf(dcb, "\t (%d seconds ago)\n",
time(0) - router_inst->stats.lastReply);
dcb_printf(dcb, "\tLast event from master: 0x%x (%s)\n",
dcb_printf(dcb, "\tLast event from master: 0x%x, %s",
router_inst->lastEventReceived,
(router_inst->lastEventReceived >= 0 &&
router_inst->lastEventReceived < 0x24) ?
event_names[router_inst->lastEventReceived] : "unknown");
if (router_inst->lastEventTimestamp)
{
localtime_r(&router_inst->lastEventTimestamp, &tm);
asctime_r(&tm, buf);
dcb_printf(dcb, "\tLast binlog event timestamp: %ld (%s)\n",
router_inst->lastEventTimestamp, buf);
}
if (router_inst->active_logs)
dcb_printf(dcb, "\tRouter processing binlog records\n");
if (router_inst->reconnect_pending)
@ -865,6 +874,9 @@ struct tm tm;
dcb_printf(dcb,
"\t\tNo. events sent: %u\n",
session->stats.n_events);
dcb_printf(dcb,
"\t\tNo. bytes sent: %u\n",
session->stats.n_bytes);
dcb_printf(dcb,
"\t\tNo. bursts sent: %u\n",
session->stats.n_bursts);
@ -890,6 +902,14 @@ struct tm tm;
dcb_printf(dcb, "\t\tNo. of distribute action 2 %u\n", session->stats.n_actions[1]);
dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]);
#endif
if (session->lastEventTimestamp
&& router_inst->lastEventTimestamp)
{
localtime_r(&session->lastEventTimestamp, &tm);
asctime_r(&tm, buf);
dcb_printf(dcb, "\t\tLast binlog event timestamp %u, %s", session->lastEventTimestamp, buf);
dcb_printf(dcb, "\t\tSeconds behind master %u\n", router_inst->lastEventTimestamp - session->lastEventTimestamp);
}
if ((session->cstate & CS_UPTODATE) == 0)
{
@ -1151,6 +1171,13 @@ int len;
return slave->dcb->func.write(slave->dcb, ret);
}
/**
* Respond to a COM_PING command
*
* @param router The router instance
* @param slave The "slave" connection that requested the ping
* @param queue The ping request
*/
int
blr_ping(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
{

View File

@ -456,7 +456,7 @@ char query[128];
break;
case BLRM_MUUID:
{
char *val = blr_extract_column(buf, 1);
char *val = blr_extract_column(buf, 2);
router->master_uuid = val;
// Response to the SERVER_UUID, should be stored
@ -925,6 +925,7 @@ static REP_HEADER phdr;
}
router->stats.n_binlogs++;
router->lastEventReceived = hdr.event_type;
router->lastEventTimestamp = hdr.timestamp;
// #define SHOW_EVENTS
#ifdef SHOW_EVENTS
@ -1291,6 +1292,7 @@ int action;
* this is a rotate event. Send the event directly from
* memory to the slave.
*/
slave->lastEventTimestamp = hdr->timestamp;
pkt = gwbuf_alloc(hdr->event_size + 5);
buf = GWBUF_DATA(pkt);
encode_value(buf, hdr->event_size + 1, 24);
@ -1303,6 +1305,7 @@ int action;
blr_slave_rotate(router, slave, ptr);
}
slave->stats.n_bytes += gwbuf_length(pkt);
slave->stats.n_events++;
slave->dcb->func.write(slave->dcb, pkt);
if (hdr->event_type != ROTATE_EVENT)
{

View File

@ -68,6 +68,7 @@ int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static void blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static int blr_slave_send_maxscale_version(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static int blr_slave_send_server_id(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static int blr_slave_send_maxscale_variables(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static int blr_slave_send_master_status(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static int blr_slave_send_slave_status(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
@ -164,6 +165,7 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
* SELECT @@hostname
* SELECT @@max_allowed_packet
* SELECT @@maxscale_version
* SELECT @@server_id
*
* Five show commands are supported:
* SHOW VARIABLES LIKE 'SERVER_ID'
@ -265,6 +267,11 @@ int query_len;
free(query_text);
return blr_slave_send_maxscale_version(router, slave);
}
else if (strcasecmp(word, "@@server_id") == 0)
{
free(query_text);
return blr_slave_send_server_id(router, slave);
}
}
else if (strcasecmp(word, "SHOW") == 0)
{
@ -544,6 +551,41 @@ int len, vers_len;
return blr_slave_send_eof(router, slave, 5);
}
/**
* Send a response the the SQL command SELECT @@server_id
*
* @param router The binlog router instance
* @param slave The slave server to which we are sending the response
* @return Non-zero if data was sent
*/
static int
blr_slave_send_server_id(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
{
GWBUF *pkt;
char server_id[40];
uint8_t *ptr;
int len, id_len;
sprintf(server_id, "%d", router->masterid);
id_len = strlen(server_id);
blr_slave_send_fieldcount(router, slave, 1);
blr_slave_send_columndef(router, slave, "SERVER_ID", 0xf, id_len, 2);
blr_slave_send_eof(router, slave, 3);
len = 5 + id_len;
if ((pkt = gwbuf_alloc(len)) == NULL)
return 0;
ptr = GWBUF_DATA(pkt);
encode_value(ptr, id_len + 1, 24); // Add length of data packet
ptr += 3;
*ptr++ = 0x04; // Sequence number in response
*ptr++ = id_len; // Length of result string
strncpy((char *)ptr, server_id, id_len); // Result string
ptr += id_len;
slave->dcb->func.write(slave->dcb, pkt);
return blr_slave_send_eof(router, slave, 5);
}
/**
* Send the response to the SQL command "SHOW VARIABLES LIKE 'MAXSCALE%'
@ -647,8 +689,8 @@ static char *slave_status_columns[] = {
"Last_Errno", "Last_Error", "Skip_Counter", "Exec_Master_Log_Pos", "Relay_Log_Space",
"Until_Condition", "Until_Log_File", "Until_Log_Pos", "Master_SSL_Allowed",
"Master_SSL_CA_File", "Master_SSL_CA_Path", "Master_SSL_Cert", "Master_SSL_Cipher",
"Master_SSL_Key",
"Seconds_Behind_Master", "Last_IO_Errno", "Last_IO_Error", "Last_SQL_Errno",
"Master_SSL_Key", "Seconds_Behind_Master",
"Master_SSL_Verify_Server_Cert", "Last_IO_Errno", "Last_IO_Error", "Last_SQL_Errno",
"Last_SQL_Error", "Replicate_Ignore_Server_Ids", "Master_Server_Id", "Master_UUID",
"Master_Info_File", "SQL_Delay", "SQL_Remaining_Delay", "Slave_SQL_Running_State",
"Master_Retry_Count", "Master_Bind", "Last_IO_Error_TimeStamp",
@ -856,9 +898,15 @@ int len, actual_len, col_len, seqno, ncols, i;
ptr += col_len;
*ptr++ = 0;
*ptr++ = 0;
/* Master_Server_Id */
sprintf(column, "%d", router->masterid);
col_len = strlen(column);
*ptr++ = col_len; // Length of result string
strncpy((char *)ptr, column, col_len); // Result string
ptr += col_len;
sprintf(column, "%s", router->master_uuid ?
router->master_uuid : router->uuid);
col_len = strlen(column);
@ -1343,6 +1391,7 @@ uint8_t *ptr;
*ptr++ = slave->seqno++;
*ptr++ = 0; // OK
head = gwbuf_append(head, record);
slave->lastEventTimestamp = hdr.timestamp;
if (hdr.event_type == ROTATE_EVENT)
{
unsigned long beat1 = hkheartbeat;