Merge branch 'develop' into MAX-324
This commit is contained in:
@ -58,7 +58,7 @@
|
||||
* combinations of username and network, either the value any or all,
|
||||
* depending on how you want to match the rules, and one or more rule names.
|
||||
*@code{.unparsed}
|
||||
* users NAME ... match [any|all] rules RULE ...
|
||||
* users NAME ... match [any|all|strict_all] rules RULE ...
|
||||
*@endcode
|
||||
*/
|
||||
#include <my_config.h>
|
||||
@ -166,6 +166,7 @@ typedef struct queryspeed_t{
|
||||
int count; /*< Number of queries done */
|
||||
int limit; /*< Maximum number of queries */
|
||||
long id; /*< Unique id of the rule */
|
||||
bool active; /*< If the rule has been triggered */
|
||||
struct queryspeed_t* next; /*< Next node in the list */
|
||||
}QUERYSPEED;
|
||||
|
||||
@ -200,6 +201,9 @@ typedef struct user_t{
|
||||
QUERYSPEED* qs_limit;/*< The query speed structure unique to this user */
|
||||
RULELIST* rules_or;/*< If any of these rules match the action is triggered */
|
||||
RULELIST* rules_and;/*< All of these rules must match for the action to trigger */
|
||||
RULELIST* rules_strict_and; /*< rules that skip the rest of the rules if one of them
|
||||
* fails. This is only for rules paired with 'match strict_all'. */
|
||||
|
||||
}USER;
|
||||
|
||||
/**
|
||||
@ -625,6 +629,7 @@ void add_users(char* rule, FW_INSTANCE* instance)
|
||||
instance->userstrings = link;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Parses the list of rule strings for users and links them against the listed rules.
|
||||
* Only adds those rules that are found. If the rule isn't found a message is written to the error log.
|
||||
@ -641,7 +646,7 @@ void link_rules(char* rule, FW_INSTANCE* instance)
|
||||
char *tok, *ruleptr, *userptr, *modeptr;
|
||||
char *saveptr = NULL;
|
||||
RULELIST* rulelist = NULL;
|
||||
|
||||
bool strict = false;
|
||||
userptr = strstr(rule,"users ");
|
||||
modeptr = strstr(rule," match ");
|
||||
ruleptr = strstr(rule," rules ");
|
||||
@ -662,6 +667,9 @@ void link_rules(char* rule, FW_INSTANCE* instance)
|
||||
match_any = true;
|
||||
}else if(strcmp(tok,"all") == 0){
|
||||
match_any = false;
|
||||
}else if(strcmp(tok,"strict_all") == 0){
|
||||
match_any = false;
|
||||
strict = true;
|
||||
}else{
|
||||
skygw_log_write(LOGFILE_ERROR, "fwfilter: Rule syntax incorrect, 'match' was not followed by 'any' or 'all': %s",rule);
|
||||
return;
|
||||
@ -730,10 +738,15 @@ void link_rules(char* rule, FW_INSTANCE* instance)
|
||||
if(match_any){
|
||||
tail->next = user->rules_or;
|
||||
user->rules_or = tl;
|
||||
}else{
|
||||
tail->next = user->rules_and;
|
||||
user->rules_and = tl;
|
||||
}else if(strict){
|
||||
tail->next = user->rules_and;
|
||||
user->rules_strict_and = tl;
|
||||
}
|
||||
else
|
||||
{
|
||||
tail->next = user->rules_and;
|
||||
user->rules_and = tl;
|
||||
}
|
||||
|
||||
hashtable_add(instance->htable,
|
||||
(void *)userptr,
|
||||
@ -1295,11 +1308,6 @@ bool rule_matches(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue
|
||||
time_t time_now;
|
||||
struct tm* tm_now;
|
||||
|
||||
if(my_session->errmsg){
|
||||
free(my_session->errmsg);
|
||||
my_session->errmsg = NULL;
|
||||
}
|
||||
|
||||
time(&time_now);
|
||||
tm_now = localtime(&time_now);
|
||||
|
||||
@ -1439,43 +1447,56 @@ bool rule_matches(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue
|
||||
queryspeed->next = user->qs_limit;
|
||||
user->qs_limit = queryspeed;
|
||||
}
|
||||
|
||||
if(queryspeed->count > queryspeed->limit)
|
||||
{
|
||||
queryspeed->triggered = time_now;
|
||||
queryspeed->count = 0;
|
||||
matches = true;
|
||||
|
||||
|
||||
skygw_log_write(LOGFILE_TRACE,
|
||||
"fwfilter: rule '%s': query limit triggered (%d queries in %f seconds), denying queries from user for %f seconds.",
|
||||
rulelist->rule->name,
|
||||
queryspeed->limit,
|
||||
queryspeed->period,
|
||||
queryspeed->cooldown);
|
||||
double blocked_for = queryspeed->cooldown - difftime(time_now,queryspeed->triggered);
|
||||
sprintf(emsg,"Queries denied for %f seconds",blocked_for);
|
||||
msg = strdup(emsg);
|
||||
}
|
||||
else if(difftime(time_now,queryspeed->triggered) < queryspeed->cooldown)
|
||||
{
|
||||
|
||||
double blocked_for = queryspeed->cooldown - difftime(time_now,queryspeed->triggered);
|
||||
|
||||
sprintf(emsg,"Queries denied for %f seconds",blocked_for);
|
||||
skygw_log_write(LOGFILE_TRACE, "fwfilter: rule '%s': user denied for %f seconds",rulelist->rule->name,blocked_for);
|
||||
msg = strdup(emsg);
|
||||
|
||||
matches = true;
|
||||
}
|
||||
else if(difftime(time_now,queryspeed->first_query) < queryspeed->period)
|
||||
{
|
||||
queryspeed->count++;
|
||||
}
|
||||
else
|
||||
{
|
||||
queryspeed->first_query = time_now;
|
||||
}
|
||||
|
||||
if(queryspeed->active)
|
||||
{
|
||||
if(difftime(time_now,queryspeed->triggered) < queryspeed->cooldown)
|
||||
{
|
||||
|
||||
double blocked_for = queryspeed->cooldown - difftime(time_now,queryspeed->triggered);
|
||||
|
||||
sprintf(emsg,"Queries denied for %f seconds",blocked_for);
|
||||
skygw_log_write(LOGFILE_TRACE, "fwfilter: rule '%s': user denied for %f seconds",rulelist->rule->name,blocked_for);
|
||||
msg = strdup(emsg);
|
||||
|
||||
matches = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
queryspeed->active = false;
|
||||
queryspeed->count = 0;
|
||||
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if(queryspeed->count >= queryspeed->limit)
|
||||
{
|
||||
queryspeed->triggered = time_now;
|
||||
matches = true;
|
||||
queryspeed->active = true;
|
||||
|
||||
skygw_log_write(LOGFILE_TRACE,
|
||||
"fwfilter: rule '%s': query limit triggered (%d queries in %f seconds), denying queries from user for %f seconds.",
|
||||
rulelist->rule->name,
|
||||
queryspeed->limit,
|
||||
queryspeed->period,
|
||||
queryspeed->cooldown);
|
||||
double blocked_for = queryspeed->cooldown - difftime(time_now,queryspeed->triggered);
|
||||
sprintf(emsg,"Queries denied for %f seconds",blocked_for);
|
||||
msg = strdup(emsg);
|
||||
}
|
||||
else if(queryspeed->count > 0 &&
|
||||
difftime(time_now,queryspeed->first_query) <= queryspeed->period)
|
||||
{
|
||||
queryspeed->count++;
|
||||
}
|
||||
else
|
||||
{
|
||||
queryspeed->first_query = time_now;
|
||||
queryspeed->count = 1;
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
@ -1499,7 +1520,11 @@ bool rule_matches(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue
|
||||
|
||||
queryresolved:
|
||||
if(msg){
|
||||
my_session->errmsg = msg;
|
||||
if(my_session->errmsg){
|
||||
free(my_session->errmsg);
|
||||
}
|
||||
|
||||
my_session->errmsg = msg;
|
||||
}
|
||||
|
||||
if(matches){
|
||||
@ -1536,7 +1561,10 @@ bool check_match_any(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *qu
|
||||
memset(fullquery + qlen,0,1);
|
||||
}
|
||||
|
||||
rulelist = user->rules_or;
|
||||
if((rulelist = user->rules_or) == NULL)
|
||||
{
|
||||
goto retblock;
|
||||
}
|
||||
|
||||
while(rulelist){
|
||||
|
||||
@ -1544,9 +1572,10 @@ bool check_match_any(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *qu
|
||||
rulelist = rulelist->next;
|
||||
continue;
|
||||
}
|
||||
if((rval = rule_matches(my_instance,my_session,queue,user,rulelist,fullquery))){
|
||||
goto retblock;
|
||||
if((rval = rule_matches(my_instance,my_session,queue,user,rulelist,fullquery))){
|
||||
goto retblock;
|
||||
}
|
||||
|
||||
rulelist = rulelist->next;
|
||||
}
|
||||
|
||||
@ -1565,9 +1594,9 @@ bool check_match_any(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *qu
|
||||
* @param user The user whose rulelist is checked
|
||||
* @return True if the query matches all of the rules otherwise false
|
||||
*/
|
||||
bool check_match_all(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue, USER* user)
|
||||
bool check_match_all(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue, USER* user,bool strict_all)
|
||||
{
|
||||
bool is_sql, rval = 0;
|
||||
bool is_sql, rval = true;
|
||||
int qlen;
|
||||
char *fullquery = NULL,*ptr;
|
||||
|
||||
@ -1585,23 +1614,38 @@ bool check_match_all(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *qu
|
||||
|
||||
|
||||
}
|
||||
|
||||
rulelist = user->rules_or;
|
||||
|
||||
|
||||
if(strict_all)
|
||||
{
|
||||
rulelist = user->rules_strict_and;
|
||||
}
|
||||
else
|
||||
{
|
||||
rulelist = user->rules_and;
|
||||
}
|
||||
|
||||
if(rulelist == NULL)
|
||||
{
|
||||
rval = false;
|
||||
goto retblock;
|
||||
}
|
||||
|
||||
while(rulelist){
|
||||
|
||||
if(!rule_is_active(rulelist->rule)){
|
||||
rulelist = rulelist->next;
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
if(!rule_matches(my_instance,my_session,queue,user,rulelist,fullquery)){
|
||||
rval = false;
|
||||
goto retblock;
|
||||
if(strict_all)
|
||||
break;
|
||||
}
|
||||
rulelist = rulelist->next;
|
||||
}
|
||||
|
||||
|
||||
retblock:
|
||||
|
||||
free(fullquery);
|
||||
@ -1664,7 +1708,12 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
||||
goto queryresolved;
|
||||
}
|
||||
|
||||
if(check_match_all(my_instance,my_session,queue,user)){
|
||||
if(check_match_all(my_instance,my_session,queue,user,false)){
|
||||
accept = false;
|
||||
goto queryresolved;
|
||||
}
|
||||
|
||||
if(check_match_all(my_instance,my_session,queue,user,true)){
|
||||
accept = false;
|
||||
goto queryresolved;
|
||||
}
|
||||
|
||||
@ -78,6 +78,8 @@
|
||||
#include <spinlock.h>
|
||||
#include <session.h>
|
||||
#include <plugin.h>
|
||||
#include <housekeeper.h>
|
||||
|
||||
MODULE_INFO info = {
|
||||
MODULE_API_FILTER,
|
||||
MODULE_ALPHA_RELEASE,
|
||||
@ -87,7 +89,7 @@ MODULE_INFO info = {
|
||||
|
||||
static char *version_str = "V1.0.2";
|
||||
static int uid_gen;
|
||||
|
||||
static int hktask_id = 0;
|
||||
/*
|
||||
* The filter entry points
|
||||
*/
|
||||
@ -179,6 +181,16 @@ typedef struct object_trigger_t{
|
||||
int size;
|
||||
}OBJ_TRIG;
|
||||
|
||||
/**
|
||||
* Statistics for the mqfilter.
|
||||
*/
|
||||
typedef struct mqstats_t{
|
||||
int n_msg; /*< Total number of messages */
|
||||
int n_sent; /*< Number of sent messages */
|
||||
int n_queued; /*< Number of unsent messages */
|
||||
|
||||
}MQSTATS;
|
||||
|
||||
|
||||
/**
|
||||
* A instance structure, containing the hostname, login credentials,
|
||||
@ -190,7 +202,6 @@ typedef struct object_trigger_t{
|
||||
* routing key named 'key'. Type of the exchange is 'direct' by default and all queries are logged.
|
||||
*
|
||||
*/
|
||||
|
||||
typedef struct {
|
||||
int port;
|
||||
char *hostname;
|
||||
@ -213,12 +224,14 @@ typedef struct {
|
||||
int conn_stat; /**state of the connection to the server*/
|
||||
int rconn_intv; /**delay for reconnects, in seconds*/
|
||||
time_t last_rconn; /**last reconnect attempt*/
|
||||
SPINLOCK* rconn_lock;
|
||||
SPINLOCK rconn_lock;
|
||||
SPINLOCK msg_lock;
|
||||
mqmessage* messages;
|
||||
enum log_trigger_t trgtype;
|
||||
SRC_TRIG* src_trg;
|
||||
SHM_TRIG* shm_trg;
|
||||
OBJ_TRIG* obj_trg;
|
||||
MQSTATS stats;
|
||||
} MQ_INSTANCE;
|
||||
|
||||
/**
|
||||
@ -239,6 +252,9 @@ typedef struct {
|
||||
bool was_query; /**True if the previous routeQuery call had valid content*/
|
||||
} MQ_SESSION;
|
||||
|
||||
void sendMessage(void* data);
|
||||
|
||||
|
||||
/**
|
||||
* Implementation of the mandatory version entry point
|
||||
*
|
||||
@ -426,6 +442,8 @@ char** parse_optstr(char* str, char* tok, int* szstore)
|
||||
char *lasts, *tk = str;
|
||||
char **arr;
|
||||
int i = 0, size = 1;
|
||||
|
||||
|
||||
while((tk = strpbrk(tk + 1,tok))){
|
||||
size++;
|
||||
}
|
||||
@ -463,11 +481,12 @@ createInstance(char **options, FILTER_PARAMETER **params)
|
||||
int paramcount = 0, parammax = 64, i = 0, x = 0, arrsize = 0;
|
||||
FILTER_PARAMETER** paramlist;
|
||||
char** arr;
|
||||
char taskname[512];
|
||||
|
||||
if ((my_instance = calloc(1, sizeof(MQ_INSTANCE)))&&
|
||||
(my_instance->rconn_lock = malloc(sizeof(SPINLOCK))))
|
||||
if ((my_instance = calloc(1, sizeof(MQ_INSTANCE))))
|
||||
{
|
||||
spinlock_init(my_instance->rconn_lock);
|
||||
spinlock_init(&my_instance->rconn_lock);
|
||||
spinlock_init(&my_instance->msg_lock);
|
||||
uid_gen = 0;
|
||||
paramlist = malloc(sizeof(FILTER_PARAMETER*)*64);
|
||||
|
||||
@ -510,6 +529,7 @@ createInstance(char **options, FILTER_PARAMETER **params)
|
||||
}else if(!strcmp(params[i]->name,"ssl_client_key")){
|
||||
|
||||
my_instance->ssl_client_key = strdup(params[i]->value);
|
||||
|
||||
}else if(!strcmp(params[i]->name,"ssl_CA_cert")){
|
||||
|
||||
my_instance->ssl_CA_cert = strdup(params[i]->value);
|
||||
@ -617,11 +637,11 @@ createInstance(char **options, FILTER_PARAMETER **params)
|
||||
}
|
||||
|
||||
}else if(!strcmp(paramlist[i]->name,"logging_log_all")){
|
||||
if(!strcmp(paramlist[i]->value,"true")){
|
||||
if(config_truth_value(paramlist[i]->value)){
|
||||
my_instance->log_all = true;
|
||||
}
|
||||
}else if(!strcmp(paramlist[i]->name,"logging_strict")){
|
||||
if(strcmp(paramlist[i]->value,"false") == 0){
|
||||
if(!config_truth_value(paramlist[i]->value)){
|
||||
my_instance->strict_logging = false;
|
||||
}
|
||||
}
|
||||
@ -669,7 +689,8 @@ createInstance(char **options, FILTER_PARAMETER **params)
|
||||
/**Connect to the server*/
|
||||
init_conn(my_instance);
|
||||
|
||||
|
||||
snprintf(taskname,511,"mqtask%d",atomic_add(&hktask_id,1));
|
||||
hktask_add(taskname,sendMessage,(void*)my_instance,5);
|
||||
|
||||
}
|
||||
return (FILTER *)my_instance;
|
||||
@ -689,7 +710,7 @@ int declareQueue(MQ_INSTANCE *my_instance, MQ_SESSION* my_session, char* qname)
|
||||
int success = 1;
|
||||
amqp_rpc_reply_t reply;
|
||||
|
||||
spinlock_acquire(my_instance->rconn_lock);
|
||||
spinlock_acquire(&my_instance->rconn_lock);
|
||||
|
||||
amqp_queue_declare(my_instance->conn,my_instance->channel,
|
||||
amqp_cstring_bytes(qname),
|
||||
@ -716,20 +737,23 @@ int declareQueue(MQ_INSTANCE *my_instance, MQ_SESSION* my_session, char* qname)
|
||||
"Error : Failed to bind queue to exchange.");
|
||||
|
||||
}
|
||||
spinlock_release(my_instance->rconn_lock);
|
||||
spinlock_release(&my_instance->rconn_lock);
|
||||
return success;
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcasts a message on the message stack to the RabbitMQ server
|
||||
* and frees the allocated memory if successful.
|
||||
* @return AMQP_STATUS_OK if the broadcasting was successful
|
||||
* and frees the allocated memory if successful. This function is only called by
|
||||
* the housekeeper thread.
|
||||
* @param data MQfilter instance
|
||||
*/
|
||||
int sendMessage(MQ_INSTANCE *instance)
|
||||
void sendMessage(void* data)
|
||||
{
|
||||
int err_code;
|
||||
MQ_INSTANCE *instance = (MQ_INSTANCE*)data;
|
||||
mqmessage *tmp;
|
||||
|
||||
int err_num;
|
||||
|
||||
spinlock_acquire(&instance->rconn_lock);
|
||||
if(instance->conn_stat != AMQP_STATUS_OK){
|
||||
|
||||
if(difftime(time(NULL),instance->last_rconn) > instance->rconn_intv){
|
||||
@ -746,29 +770,70 @@ int sendMessage(MQ_INSTANCE *instance)
|
||||
"Error : Failed to reconnect to the MQRabbit server ");
|
||||
}
|
||||
}
|
||||
err_num = instance->conn_stat;
|
||||
}
|
||||
|
||||
if(instance->messages){
|
||||
instance->conn_stat = amqp_basic_publish(instance->conn,instance->channel,
|
||||
spinlock_release(&instance->rconn_lock);
|
||||
|
||||
if(err_num != AMQP_STATUS_OK)
|
||||
{
|
||||
/** No connection to the broker */
|
||||
return;
|
||||
}
|
||||
|
||||
spinlock_acquire(&instance->msg_lock);
|
||||
tmp = instance->messages;
|
||||
|
||||
if(tmp == NULL)
|
||||
{
|
||||
spinlock_release(&instance->msg_lock);
|
||||
return;
|
||||
}
|
||||
|
||||
instance->messages = instance->messages->next;
|
||||
spinlock_release(&instance->msg_lock);
|
||||
|
||||
while(tmp){
|
||||
|
||||
err_num = amqp_basic_publish(instance->conn,instance->channel,
|
||||
amqp_cstring_bytes(instance->exchange),
|
||||
amqp_cstring_bytes(instance->key),
|
||||
0,0,instance->messages->prop,amqp_cstring_bytes(instance->messages->msg));
|
||||
0,0,tmp->prop,amqp_cstring_bytes(tmp->msg));
|
||||
|
||||
|
||||
/**Message was sent successfully*/
|
||||
if(instance->conn_stat == AMQP_STATUS_OK){
|
||||
tmp = instance->messages;
|
||||
instance->messages = instance->messages->next;
|
||||
spinlock_acquire(&instance->rconn_lock);
|
||||
instance->conn_stat = err_num;
|
||||
spinlock_release(&instance->rconn_lock);
|
||||
|
||||
if(err_num == AMQP_STATUS_OK){
|
||||
/**Message was sent successfully*/
|
||||
free(tmp->prop);
|
||||
free(tmp->msg);
|
||||
free(tmp);
|
||||
|
||||
atomic_add(&instance->stats.n_sent,1);
|
||||
atomic_add(&instance->stats.n_queued,-1);
|
||||
spinlock_acquire(&instance->msg_lock);
|
||||
tmp = instance->messages;
|
||||
|
||||
if(tmp == NULL)
|
||||
{
|
||||
spinlock_release(&instance->msg_lock);
|
||||
return;
|
||||
}
|
||||
|
||||
instance->messages = instance->messages->next;
|
||||
spinlock_release(&instance->msg_lock);
|
||||
}
|
||||
else
|
||||
{
|
||||
spinlock_acquire(&instance->msg_lock);
|
||||
tmp->next = instance->messages;
|
||||
instance->messages = tmp;
|
||||
spinlock_release(&instance->msg_lock);
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
err_code = instance->conn_stat;
|
||||
|
||||
return err_code;
|
||||
}
|
||||
|
||||
|
||||
@ -780,34 +845,30 @@ int sendMessage(MQ_INSTANCE *instance)
|
||||
*/
|
||||
void pushMessage(MQ_INSTANCE *instance, amqp_basic_properties_t* prop, char* msg)
|
||||
{
|
||||
spinlock_acquire(instance->rconn_lock);
|
||||
|
||||
mqmessage* newmsg = malloc(sizeof(mqmessage));
|
||||
mqmessage* newmsg = calloc(1,sizeof(mqmessage));
|
||||
if(newmsg){
|
||||
|
||||
newmsg->msg = msg;
|
||||
newmsg->prop = prop;
|
||||
newmsg->next = NULL;
|
||||
|
||||
if(instance->messages){
|
||||
newmsg->next = instance->messages;
|
||||
}
|
||||
|
||||
instance->messages = newmsg;
|
||||
|
||||
}else{
|
||||
skygw_log_write(LOGFILE_ERROR,
|
||||
"Error : Cannot allocate enough memory.");
|
||||
free(prop);
|
||||
free(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
while(instance->messages){
|
||||
if(sendMessage(instance) != AMQP_STATUS_OK){
|
||||
break;
|
||||
}
|
||||
}
|
||||
spinlock_acquire(&instance->msg_lock);
|
||||
|
||||
newmsg->next = instance->messages;
|
||||
instance->messages = newmsg;
|
||||
|
||||
spinlock_release(instance->rconn_lock);
|
||||
spinlock_release(&instance->msg_lock);
|
||||
|
||||
atomic_add(&instance->stats.n_msg,1);
|
||||
atomic_add(&instance->stats.n_queued,1);
|
||||
}
|
||||
|
||||
|
||||
@ -1458,12 +1519,18 @@ diagnostic(FILTER *instance, void *fsession, DCB *dcb)
|
||||
|
||||
if (my_instance)
|
||||
{
|
||||
dcb_printf(dcb, "\t\tConnecting to %s:%d as %s/%s.\nVhost: %s\tExchange: %s\tKey: %s\tQueue: %s\n",
|
||||
dcb_printf(dcb, "Connecting to %s:%d as '%s'.\nVhost: %s\tExchange: %s\nKey: %s\tQueue: %s\n\n",
|
||||
my_instance->hostname,my_instance->port,
|
||||
my_instance->username,my_instance->password,
|
||||
my_instance->username,
|
||||
my_instance->vhost, my_instance->exchange,
|
||||
my_instance->key, my_instance->queue
|
||||
);
|
||||
dcb_printf(dcb, "%-16s%-16s%-16s\n",
|
||||
"Messages","Queued","Sent");
|
||||
dcb_printf(dcb, "%-16d%-16d%-16d\n",
|
||||
my_instance->stats.n_msg,
|
||||
my_instance->stats.n_queued,
|
||||
my_instance->stats.n_sent);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -187,7 +187,7 @@ int main(int argc, char** argv){
|
||||
}
|
||||
|
||||
instance.thrpool = t_thr_pool;
|
||||
int thr_num = 1;
|
||||
intptr_t thr_num = 1;
|
||||
|
||||
for(i = 0;i<instance.thrcount;i++){
|
||||
|
||||
|
||||
@ -690,7 +690,7 @@ int log_no_master = 1;
|
||||
if (!(SERVER_IS_RUNNING(ptr->server)) ||
|
||||
!(SERVER_IS_IN_CLUSTER(ptr->server)))
|
||||
{
|
||||
dcb_call_foreach(DCB_REASON_NOT_RESPONDING);
|
||||
dcb_call_foreach(ptr->server,DCB_REASON_NOT_RESPONDING);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -31,8 +31,10 @@
|
||||
* @verbatim
|
||||
* Revision History
|
||||
*
|
||||
* Date Who Description
|
||||
* Date Who Description
|
||||
* 02/04/2014 Mark Riddoch Initial implementation
|
||||
* 17/02/2015 Massimiliano Pinto Addition of slave port and username in diagnostics
|
||||
* 18/02/2015 Massimiliano Pinto Addition of dcb_close in closeSession
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -625,6 +627,14 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
|
||||
|
||||
/* Unlock */
|
||||
rses_end_locked_router_action(slave);
|
||||
|
||||
/**
|
||||
* Close the slave server connection
|
||||
*/
|
||||
if (slave->dcb != NULL) {
|
||||
CHK_DCB(slave->dcb);
|
||||
dcb_close(slave->dcb);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -848,8 +858,11 @@ struct tm tm;
|
||||
if (session->uuid)
|
||||
dcb_printf(dcb, "\t\tSlave UUID: %s\n", session->uuid);
|
||||
dcb_printf(dcb,
|
||||
"\t\tSlave: %s\n",
|
||||
session->dcb->remote);
|
||||
"\t\tSlave_host_port: %s:%d\n",
|
||||
session->dcb->remote, ntohs((session->dcb->ipv4).sin_port));
|
||||
dcb_printf(dcb,
|
||||
"\t\tUsername: %s\n",
|
||||
session->dcb->user);
|
||||
dcb_printf(dcb,
|
||||
"\t\tSlave DCB: %p\n",
|
||||
session->dcb);
|
||||
@ -911,18 +924,21 @@ struct tm tm;
|
||||
dcb_printf(dcb, "\t\tSeconds behind master %u\n", router_inst->lastEventTimestamp - session->lastEventTimestamp);
|
||||
}
|
||||
|
||||
if ((session->cstate & CS_UPTODATE) == 0)
|
||||
if (session->state == 0)
|
||||
{
|
||||
dcb_printf(dcb, "\t\tSlave is in catchup mode. %s%s\n",
|
||||
dcb_printf(dcb, "\t\tSlave_mode: connected\n");
|
||||
}
|
||||
else if ((session->cstate & CS_UPTODATE) == 0)
|
||||
{
|
||||
dcb_printf(dcb, "\t\tSlave_mode: catchup. %s%s\n",
|
||||
((session->cstate & CS_EXPECTCB) == 0 ? "" :
|
||||
"Waiting for DCB queue to drain."),
|
||||
((session->cstate & CS_BUSY) == 0 ? "" :
|
||||
" Busy in slave catchup."));
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
dcb_printf(dcb, "\t\tSlave is in normal mode.\n");
|
||||
dcb_printf(dcb, "\t\tSlave_mode: follow\n");
|
||||
if (session->binlog_pos != router_inst->binlog_position)
|
||||
{
|
||||
dcb_printf(dcb, "\t\tSlave reports up to date however "
|
||||
|
||||
@ -387,14 +387,19 @@ char query[128];
|
||||
break;
|
||||
case BLRM_SERVERID:
|
||||
{
|
||||
char *val = blr_extract_column(buf, 1);
|
||||
char *val = blr_extract_column(buf, 2);
|
||||
|
||||
// Response to fetch of master's server-id
|
||||
if (router->saved_master.server_id)
|
||||
GWBUF_CONSUME_ALL(router->saved_master.server_id);
|
||||
router->saved_master.server_id = buf;
|
||||
blr_cache_response(router, "serverid", buf);
|
||||
// TODO: Extract the value of server-id and place in router->master_id
|
||||
|
||||
// set router->masterid from master server-id if it's not set by the config option
|
||||
if (router->masterid == 0) {
|
||||
router->masterid = atoi(val);
|
||||
}
|
||||
|
||||
{
|
||||
char str[80];
|
||||
sprintf(str, "SET @master_heartbeat_period = %lu000000000", router->heartbeat);
|
||||
|
||||
@ -31,8 +31,9 @@
|
||||
* @verbatim
|
||||
* Revision History
|
||||
*
|
||||
* Date Who Description
|
||||
* Date Who Description
|
||||
* 14/04/2014 Mark Riddoch Initial implementation
|
||||
* 18/02/2015 Massimiliano Pinto Addition of DISCONNECT ALL and DISCONNECT SERVER server_id
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -76,6 +77,9 @@ static int blr_slave_send_slave_hosts(ROUTER_INSTANCE *router, ROUTER_SLAVE *sla
|
||||
static int blr_slave_send_fieldcount(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int count);
|
||||
static int blr_slave_send_columndef(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *name, int type, int len, uint8_t seqno);
|
||||
static int blr_slave_send_eof(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int seqno);
|
||||
static int blr_slave_send_disconnected_server(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int server_id, int found);
|
||||
static int blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
||||
static int blr_slave_disconnect_server(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int server_id);
|
||||
|
||||
extern int lm_enabled_logfiles_bitmask;
|
||||
extern size_t log_ses_count[];
|
||||
@ -395,6 +399,35 @@ int query_len;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (strcasecmp(query_text, "DISCONNECT") == 0)
|
||||
{
|
||||
if ((word = strtok_r(NULL, sep, &brkb)) == NULL)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "%s: Incomplete DISCONNECT command.",
|
||||
router->service->name)));
|
||||
|
||||
}
|
||||
else if (strcasecmp(word, "ALL") == 0)
|
||||
{
|
||||
free(query_text);
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
return blr_slave_disconnect_all(router, slave);
|
||||
}
|
||||
else if (strcasecmp(word, "SERVER") == 0)
|
||||
{
|
||||
if ((word = strtok_r(NULL, sep, &brkb)) == NULL)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"%s: Expected DISCONNECT SERVER $server_id",
|
||||
router->service->name)));
|
||||
} else {
|
||||
free(query_text);
|
||||
return blr_slave_disconnect_server(router, slave, atoi(word));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
free(query_text);
|
||||
|
||||
query_text = strndup(qtext, query_len);
|
||||
@ -1837,3 +1870,190 @@ uint8_t *ptr;
|
||||
encode_value(ptr, 2, 16); // Autocommit enabled
|
||||
return slave->dcb->func.write(slave->dcb, pkt);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the reply only to the SQL command "DISCONNECT SERVER $server_id'
|
||||
*
|
||||
* @param router The binlog router instance
|
||||
* @param slave The slave server to which we are sending the response
|
||||
* @return Non-zero if data was sent
|
||||
*/
|
||||
static int
|
||||
blr_slave_send_disconnected_server(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int server_id, int found)
|
||||
{
|
||||
GWBUF *pkt;
|
||||
char state[40];
|
||||
char serverid[40];
|
||||
uint8_t *ptr;
|
||||
int len, id_len, seqno = 2;
|
||||
|
||||
blr_slave_send_fieldcount(router, slave, 2);
|
||||
blr_slave_send_columndef(router, slave, "server_id", 0x03, 40, seqno++);
|
||||
blr_slave_send_columndef(router, slave, "state", 0xf, 40, seqno++);
|
||||
blr_slave_send_eof(router, slave, seqno++);
|
||||
|
||||
sprintf(serverid, "%d", server_id);
|
||||
id_len = strlen(serverid);
|
||||
if (found)
|
||||
strcpy(state, "disconnected");
|
||||
else
|
||||
strcpy(state, "not found");
|
||||
|
||||
len = 5 + id_len + strlen(state) + 1;
|
||||
if ((pkt = gwbuf_alloc(len)) == NULL)
|
||||
return 0;
|
||||
ptr = GWBUF_DATA(pkt);
|
||||
encode_value(ptr, id_len + 2 + strlen(state), 24); // Add length of data packet
|
||||
ptr += 3;
|
||||
*ptr++ = seqno++; // Sequence number in response
|
||||
|
||||
*ptr++ = id_len; // Length of result string
|
||||
strncpy((char *)ptr, serverid, id_len); // Result string
|
||||
ptr += id_len;
|
||||
|
||||
*ptr++ = strlen(state); // Length of result string
|
||||
strncpy((char *)ptr, state, strlen(state)); // Result string
|
||||
ptr += strlen(state);
|
||||
|
||||
slave->dcb->func.write(slave->dcb, pkt);
|
||||
|
||||
return blr_slave_send_eof(router, slave, seqno++);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Send the response to the SQL command "DISCONNECT SERVER $server_id'
|
||||
* and close the connection to that server
|
||||
*
|
||||
* @param router The binlog router instance
|
||||
* @param slave The slave server to which we are sending the response
|
||||
* @param server_id The slave server_id to disconnect
|
||||
* @return Non-zero if data was sent to the client
|
||||
*/
|
||||
static int
|
||||
blr_slave_disconnect_server(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int server_id)
|
||||
{
|
||||
ROUTER_OBJECT *router_obj= router->service->router;
|
||||
ROUTER_SLAVE *sptr;
|
||||
int n;
|
||||
int server_found = 0;
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
sptr = router->slaves;
|
||||
/* look for server_id among all registered slaves */
|
||||
while (sptr)
|
||||
{
|
||||
/* don't examine slaves with state = 0 */
|
||||
if (sptr->state != 0 && sptr->serverid == server_id)
|
||||
{
|
||||
/* server_id found */
|
||||
server_found = 1;
|
||||
|
||||
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "DISCONNECT SERVER: closing [%s], server id [%d]",
|
||||
sptr->dcb->remote, server_id)));
|
||||
|
||||
/* send server_id with disconnect state to client */
|
||||
n = blr_slave_send_disconnected_server(router, slave, server_id, 1);
|
||||
|
||||
/* force session close for matched slave */
|
||||
router_obj->closeSession(router->service->router_instance, sptr);
|
||||
|
||||
break;
|
||||
} else {
|
||||
sptr = sptr->next;
|
||||
}
|
||||
}
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
/** server id was not found
|
||||
* send server_id with not found state to the client
|
||||
*/
|
||||
if (!server_found)
|
||||
{
|
||||
n = blr_slave_send_disconnected_server(router, slave, server_id, 0);
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the response to the SQL command "DISCONNECT ALL'
|
||||
* and close the connection to all slave servers
|
||||
*
|
||||
* @param router The binlog router instance
|
||||
* @param slave The slave server to which we are sending the response
|
||||
* @return Non-zero if data was sent to the client
|
||||
*/
|
||||
static int
|
||||
blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
|
||||
{
|
||||
ROUTER_OBJECT *router_obj= router->service->router;
|
||||
ROUTER_SLAVE *sptr;
|
||||
char server_id[40];
|
||||
char state[40];
|
||||
uint8_t *ptr;
|
||||
int len, seqno;
|
||||
GWBUF *pkt;
|
||||
int n = 0;
|
||||
|
||||
/* preparing output result */
|
||||
blr_slave_send_fieldcount(router, slave, 2);
|
||||
blr_slave_send_columndef(router, slave, "server_id", 0x03, 40, 2);
|
||||
blr_slave_send_columndef(router, slave, "state", 0xf, 40, 3);
|
||||
blr_slave_send_eof(router, slave, 4);
|
||||
seqno = 5;
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
sptr = router->slaves;
|
||||
|
||||
while (sptr)
|
||||
{
|
||||
/* skip servers with state = 0 */
|
||||
if (sptr->state != 0)
|
||||
{
|
||||
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "DISCONNECT ALL: closing [%s], server_id [%d]",
|
||||
sptr->dcb->remote, sptr->serverid)));
|
||||
|
||||
sprintf(server_id, "%d", sptr->serverid);
|
||||
sprintf(state, "disconnected");
|
||||
|
||||
len = 5 + strlen(server_id) + strlen(state) + 1;
|
||||
if ((pkt = gwbuf_alloc(len)) == NULL) {
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Error: gwbuf memory allocation in "
|
||||
"DISCONNECT ALL for [%s], server_id [%d]",
|
||||
sptr->dcb->remote, sptr->serverid)));
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
ptr = GWBUF_DATA(pkt);
|
||||
encode_value(ptr, len - 4, 24); // Add length of data packet
|
||||
|
||||
ptr += 3;
|
||||
*ptr++ = seqno++; // Sequence number in response
|
||||
*ptr++ = strlen(server_id); // Length of result string
|
||||
strncpy((char *)ptr, server_id, strlen(server_id)); // Result string
|
||||
ptr += strlen(server_id);
|
||||
*ptr++ = strlen(state); // Length of result string
|
||||
strncpy((char *)ptr, state, strlen(state)); // Result string
|
||||
ptr += strlen(state);
|
||||
|
||||
n = slave->dcb->func.write(slave->dcb, pkt);
|
||||
|
||||
/* force session close*/
|
||||
router_obj->closeSession(router->service->router_instance, sptr);
|
||||
|
||||
}
|
||||
sptr = sptr->next;
|
||||
}
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
blr_slave_send_eof(router, slave, seqno);
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
@ -157,6 +157,10 @@ struct subcommand showoptions[] = {
|
||||
"Show all configured servers",
|
||||
"Show all configured servers",
|
||||
{0, 0, 0} },
|
||||
{ "serversjson", 0, dprintAllServersJson,
|
||||
"Show all configured servers in JSON format",
|
||||
"Show all configured servers in JSON format",
|
||||
{0, 0, 0} },
|
||||
{ "services", 0, dprintAllServices,
|
||||
"Show all configured services in MaxScale",
|
||||
"Show all configured services in MaxScale",
|
||||
|
||||
@ -1373,8 +1373,19 @@ static route_target_t get_route_target (
|
||||
* backends but since this is SELECT that is not possible:
|
||||
* 1. response set is not handled correctly in clientReply and
|
||||
* 2. multiple results can degrade performance.
|
||||
*
|
||||
* Prepared statements are an exception to this since they do not
|
||||
* actually do anything but only prepare the statement to be used.
|
||||
* They can be safely routed to all backends since the execution
|
||||
* is done later.
|
||||
*
|
||||
* With prepared statement caching the task of routing
|
||||
* the execution of the prepared statements to the right server would be
|
||||
* an easy one. Currently this is not supported.
|
||||
*/
|
||||
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ))
|
||||
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) &&
|
||||
!( QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_STMT) ||
|
||||
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_NAMED_STMT)))
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
|
||||
Reference in New Issue
Block a user