Use module parameters in mqfilter

Cleaned up mqfilter createInstance and converted it to use module
parameters.
This commit is contained in:
Markus Mäkelä 2017-01-08 14:50:54 +02:00
parent dd372cda17
commit 9b2d382232
3 changed files with 112 additions and 256 deletions

View File

@ -270,6 +270,22 @@ const char* config_get_string(const CONFIG_PARAMETER *params, const char *key);
*/
int config_get_enum(const CONFIG_PARAMETER *params, const char *key, const MXS_ENUM_VALUE *values);
/**
* @brief Get copy of parameter value if it is defined
*
* If a parameter with the name of @c key is defined in @c params, a copy of the
* value of that parameter is returned. The caller must free the returned string.
*
* @param params List of configuration parameters
* @param key Parameter name
*
* @return Pointer to copy of value or NULL if the parameter was not found
*
* @note The use of this function should be avoided after startup as the function
* will abort the process if memory allocation fails.
*/
char* config_copy_string(const CONFIG_PARAMETER *params, const char *key);
/**
* @brief Generate default module parameters
*

View File

@ -985,6 +985,20 @@ int config_get_enum(const CONFIG_PARAMETER *params, const char *key, const MXS_E
return found ? rv : -1;
}
char* config_copy_string(const CONFIG_PARAMETER *params, const char *key)
{
const char *value = config_get_value_string(params, key);
char *rval = NULL;
if (*value)
{
rval = MXS_STRDUP_A(value);
}
return rval;
}
CONFIG_PARAMETER* config_clone_param(const CONFIG_PARAMETER* param)
{
CONFIG_PARAMETER *p2 = MXS_MALLOC(sizeof(CONFIG_PARAMETER));

View File

@ -237,6 +237,15 @@ typedef struct
void sendMessage(void* data);
static const MXS_ENUM_VALUE trigger_values[] =
{
{"source", TRG_SOURCE},
{"schema", TRG_SCHEMA},
{"object", TRG_OBJECT},
{"all", TRG_ALL},
{NULL}
};
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
@ -275,6 +284,25 @@ MXS_MODULE* MXS_CREATE_MODULE()
NULL, /* Thread init. */
NULL, /* Thread finish. */
{
{"hostname", MXS_MODULE_PARAM_STRING, "localhost"},
{"username", MXS_MODULE_PARAM_STRING, "guest"},
{"password", MXS_MODULE_PARAM_STRING, "guest"},
{"vhost", MXS_MODULE_PARAM_STRING, "/"},
{"port", MXS_MODULE_PARAM_COUNT, "5672"},
{"exchange", MXS_MODULE_PARAM_STRING, "default_exchange"},
{"key", MXS_MODULE_PARAM_STRING, "key"},
{"queue", MXS_MODULE_PARAM_STRING},
{"ssl_client_certificate", MXS_MODULE_PARAM_PATH, NULL, MXS_MODULE_OPT_PATH_R_OK},
{"ssl_client_key", MXS_MODULE_PARAM_PATH, NULL, MXS_MODULE_OPT_PATH_R_OK},
{"ssl_CA_cert", MXS_MODULE_PARAM_PATH, NULL, MXS_MODULE_OPT_PATH_R_OK},
{"exchange_type", MXS_MODULE_PARAM_STRING, "direct"},
{"logging_trigger", MXS_MODULE_PARAM_ENUM, "all", MXS_MODULE_OPT_NONE, trigger_values},
{"logging_source_user", MXS_MODULE_PARAM_STRING},
{"logging_source_host", MXS_MODULE_PARAM_STRING},
{"logging_schema", MXS_MODULE_PARAM_STRING},
{"logging_object", MXS_MODULE_PARAM_STRING},
{"logging_log_all", MXS_MODULE_PARAM_BOOL, "false"},
{"logging_strict", MXS_MODULE_PARAM_BOOL, "true"},
{MXS_END_MODULE_PARAMS}
}
};
@ -449,10 +477,11 @@ cleanup:
* @param szstore Address where to store the size of the array after parsing
* @return The array containing the parsed string
*/
char** parse_optstr(char* str, char* tok, int* szstore)
char** parse_optstr(const char* str, const char* tok, int* szstore)
{
char *lasts, *tk = str;
char **arr;
char tmp[strlen(str) + 1];
strcpy(tmp, str);
char *lasts, *tk = tmp;
int i = 0, size = 1;
@ -461,21 +490,18 @@ char** parse_optstr(char* str, char* tok, int* szstore)
size++;
}
arr = MXS_MALLOC(sizeof(char*)*size);
if (arr == NULL)
{
*szstore = 0;
return NULL;
}
char **arr = MXS_MALLOC(sizeof(char*) * size);
MXS_ABORT_IF_NULL(arr);
*szstore = size;
tk = strtok_r(str, tok, &lasts);
tk = strtok_r(tmp, tok, &lasts);
while (tk && i < size)
{
arr[i++] = MXS_STRDUP_A(tk);
tk = strtok_r(NULL, tok, &lasts);
}
return arr;
}
@ -492,285 +518,91 @@ char** parse_optstr(char* str, char* tok, int* szstore)
static FILTER *
createInstance(const char *name, char **options, CONFIG_PARAMETER *params)
{
MQ_INSTANCE *my_instance;
int paramcount = 0, parammax = 64, i = 0, x = 0, arrsize = 0;
CONFIG_PARAMETER** paramlist;
char** arr = NULL;
char taskname[512];
MQ_INSTANCE *my_instance = MXS_CALLOC(1, sizeof(MQ_INSTANCE));
if ((my_instance = MXS_CALLOC(1, sizeof(MQ_INSTANCE))))
if (my_instance)
{
spinlock_init(&my_instance->rconn_lock);
spinlock_init(&my_instance->msg_lock);
uid_gen = 0;
paramlist = MXS_MALLOC(sizeof(CONFIG_PARAMETER*) * 64);
MXS_ABORT_IF_NULL(paramlist);
if ((my_instance->conn = amqp_new_connection()) == NULL)
{
MXS_FREE(paramlist);
MXS_FREE(my_instance);
return NULL;
}
my_instance->channel = 1;
my_instance->last_rconn = time(NULL);
my_instance->conn_stat = AMQP_STATUS_OK;
my_instance->rconn_intv = 1;
my_instance->port = 5672;
my_instance->trgtype = TRG_ALL;
my_instance->log_all = false;
my_instance->strict_logging = true;
for (const CONFIG_PARAMETER *p = p; p; p = p->next)
{
if (!strcmp(p->name, "hostname"))
{
my_instance->hostname = MXS_STRDUP_A(p->value);
}
else if (!strcmp(p->name, "username"))
{
my_instance->username = MXS_STRDUP_A(p->value);
}
else if (!strcmp(p->name, "password"))
{
my_instance->password = MXS_STRDUP_A(p->value);
}
else if (!strcmp(p->name, "vhost"))
{
my_instance->vhost = MXS_STRDUP_A(p->value);
}
else if (!strcmp(p->name, "port"))
{
my_instance->port = atoi(p->value);
}
else if (!strcmp(p->name, "exchange"))
{
my_instance->exchange = MXS_STRDUP_A(p->value);
}
else if (!strcmp(p->name, "key"))
{
my_instance->key = MXS_STRDUP_A(p->value);
}
else if (!strcmp(p->name, "queue"))
{
my_instance->queue = MXS_STRDUP_A(p->value);
}
else if (!strcmp(p->name, "ssl_client_certificate"))
{
my_instance->ssl_client_cert = MXS_STRDUP_A(p->value);
}
else if (!strcmp(p->name, "ssl_client_key"))
{
my_instance->ssl_client_key = MXS_STRDUP_A(p->value);
}
else if (!strcmp(p->name, "ssl_CA_cert"))
{
my_instance->ssl_CA_cert = MXS_STRDUP_A(p->value);
}
else if (!strcmp(p->name, "exchange_type"))
{
my_instance->exchange_type = MXS_STRDUP_A(p->value);
}
else if (!strcmp(p->name, "logging_trigger"))
{
arr = parse_optstr(p->value, ",", &arrsize);
for (x = 0; x < arrsize; x++)
{
if (!strcmp(arr[x], "source"))
{
my_instance->trgtype |= TRG_SOURCE;
}
else if (!strcmp(arr[x], "schema"))
{
my_instance->trgtype |= TRG_SCHEMA;
}
else if (!strcmp(arr[x], "object"))
{
my_instance->trgtype |= TRG_OBJECT;
}
else if (!strcmp(arr[x], "all"))
{
my_instance->trgtype = TRG_ALL;
}
else
{
MXS_ERROR("Unknown option for 'logging_trigger':%s.", arr[x]);
}
}
if (arrsize > 0)
{
for (int x = 0; x < arrsize; x++)
{
MXS_FREE(arr[x]);
}
MXS_FREE(arr);
arr = NULL;
}
arrsize = 0;
}
else if (strstr(p->name, "logging_"))
{
if (paramcount < parammax)
{
paramlist[paramcount] = MXS_MALLOC(sizeof(CONFIG_PARAMETER));
MXS_ABORT_IF_NULL(paramlist[paramcount]);
paramlist[paramcount]->name = MXS_STRDUP_A(p->name);
paramlist[paramcount]->value = MXS_STRDUP_A(p->value);
paramcount++;
}
}
}
my_instance->port = config_get_integer(params, "port");
my_instance->trgtype = config_get_enum(params, "logging_trigger", trigger_values);
my_instance->log_all = config_get_bool(params, "logging_log_all");
my_instance->strict_logging = config_get_bool(params, "logging_strict");
my_instance->hostname = MXS_STRDUP_A(config_get_string(params, "hostname"));
my_instance->username = MXS_STRDUP_A(config_get_string(params, "username"));
my_instance->password = MXS_STRDUP_A(config_get_string(params, "password"));
my_instance->vhost = MXS_STRDUP_A(config_get_string(params, "vhost"));
my_instance->exchange = MXS_STRDUP_A(config_get_string(params, "exchange"));
my_instance->key = MXS_STRDUP_A(config_get_string(params, "key"));
my_instance->exchange_type = MXS_STRDUP_A(config_get_string(params, "exchange_type"));
my_instance->queue = config_copy_string(params, "queue");
my_instance->ssl_client_cert = config_copy_string(params, "ssl_client_certificate");
my_instance->ssl_client_key = config_copy_string(params, "ssl_client_key");
my_instance->ssl_CA_cert = config_copy_string(params, "ssl_CA_cert");
if (my_instance->trgtype & TRG_SOURCE)
{
my_instance->src_trg = (SRC_TRIG*) MXS_MALLOC(sizeof(SRC_TRIG));
my_instance->src_trg = (SRC_TRIG*)MXS_CALLOC(1, sizeof(SRC_TRIG));
MXS_ABORT_IF_NULL(my_instance->src_trg);
my_instance->src_trg->user = NULL;
my_instance->src_trg->host = NULL;
my_instance->src_trg->usize = 0;
my_instance->src_trg->hsize = 0;
}
if (my_instance->trgtype & TRG_SCHEMA)
{
my_instance->shm_trg = (SHM_TRIG*) MXS_MALLOC(sizeof(SHM_TRIG));
my_instance->shm_trg = (SHM_TRIG*)MXS_CALLOC(1, sizeof(SHM_TRIG));
MXS_ABORT_IF_NULL(my_instance->shm_trg);
my_instance->shm_trg->objects = NULL;
my_instance->shm_trg->size = 0;
}
if (my_instance->trgtype & TRG_OBJECT)
{
my_instance->obj_trg = (OBJ_TRIG*) MXS_MALLOC(sizeof(OBJ_TRIG));
my_instance->obj_trg = (OBJ_TRIG*)MXS_CALLOC(1, sizeof(OBJ_TRIG));
MXS_ABORT_IF_NULL(my_instance->obj_trg);
my_instance->obj_trg->objects = NULL;
my_instance->obj_trg->size = 0;
}
for (i = 0; i < paramcount; i++)
CONFIG_PARAMETER *p = config_get_param(params, "logging_source_user");
if (p && my_instance->src_trg)
{
if (!strcmp(paramlist[i]->name, "logging_source_user"))
{
if (my_instance->src_trg)
{
my_instance->src_trg->user = parse_optstr(paramlist[i]->value, ",", &arrsize);
my_instance->src_trg->usize = arrsize;
arrsize = 0;
}
}
else if (!strcmp(paramlist[i]->name, "logging_source_host"))
{
if (my_instance->src_trg)
{
my_instance->src_trg->host = parse_optstr(paramlist[i]->value, ",", &arrsize);
my_instance->src_trg->hsize = arrsize;
arrsize = 0;
}
}
else if (!strcmp(paramlist[i]->name, "logging_schema"))
{
if (my_instance->shm_trg)
{
my_instance->shm_trg->objects = parse_optstr(paramlist[i]->value, ",", &arrsize);
my_instance->shm_trg->size = arrsize;
arrsize = 0;
}
}
else if (!strcmp(paramlist[i]->name, "logging_object"))
{
if (my_instance->obj_trg)
{
my_instance->obj_trg->objects = parse_optstr(paramlist[i]->value, ",", &arrsize);
my_instance->obj_trg->size = arrsize;
arrsize = 0;
}
}
else if (!strcmp(paramlist[i]->name, "logging_log_all"))
{
if (config_truth_value(paramlist[i]->value))
{
my_instance->log_all = true;
}
}
else if (!strcmp(paramlist[i]->name, "logging_strict"))
{
if (!config_truth_value(paramlist[i]->value))
{
my_instance->strict_logging = false;
}
}
MXS_FREE(paramlist[i]->name);
MXS_FREE(paramlist[i]->value);
MXS_FREE(paramlist[i]);
my_instance->src_trg->user = parse_optstr(p->value, ",", &my_instance->src_trg->usize);
}
MXS_FREE(paramlist);
p = config_get_param(params, "logging_source_host");
if (my_instance->hostname == NULL)
if (p && my_instance->src_trg)
{
my_instance->hostname = MXS_STRDUP_A("localhost");
}
if (my_instance->username == NULL)
{
my_instance->username = MXS_STRDUP_A("guest");
}
if (my_instance->password == NULL)
{
my_instance->password = MXS_STRDUP_A("guest");
}
if (my_instance->vhost == NULL)
{
my_instance->vhost = MXS_STRDUP_A("/");
}
if (my_instance->exchange == NULL)
{
my_instance->exchange = MXS_STRDUP_A("default_exchange");
}
if (my_instance->key == NULL)
{
my_instance->key = MXS_STRDUP_A("key");
}
if (my_instance->exchange_type == NULL)
{
my_instance->exchange_type = MXS_STRDUP_A("direct");
my_instance->src_trg->host = parse_optstr(p->value, ",", &my_instance->src_trg->hsize);
}
if (my_instance->ssl_client_cert != NULL &&
my_instance->ssl_client_key != NULL &&
my_instance->ssl_CA_cert != NULL)
p = config_get_param(params, "logging_schema");
if (p && my_instance->shm_trg)
{
my_instance->use_ssl = true;
my_instance->shm_trg->objects = parse_optstr(p->value, ",", &my_instance->shm_trg->size);
}
else
p = config_get_param(params, "logging_object");
if (p && my_instance->obj_trg)
{
my_instance->use_ssl = false;
my_instance->obj_trg->objects = parse_optstr(p->value, ",", &my_instance->obj_trg->size);
}
my_instance->use_ssl = my_instance->ssl_client_cert &&
my_instance->ssl_client_key &&
my_instance->ssl_CA_cert;
if (my_instance->use_ssl)
{
/**Assume the underlying SSL library is already initialized*/
@ -780,18 +612,12 @@ createInstance(const char *name, char **options, CONFIG_PARAMETER *params)
/**Connect to the server*/
init_conn(my_instance);
char taskname[512];
snprintf(taskname, 511, "mqtask%d", atomic_add(&hktask_id, 1));
hktask_add(taskname, sendMessage, (void*) my_instance, 5);
if (arr)
{
for (int x = 0; x < arrsize; x++)
{
MXS_FREE(arr[x]);
}
MXS_FREE(arr);
}
hktask_add(taskname, sendMessage, (void*)my_instance, 5);
}
return (FILTER *) my_instance;
return (FILTER *)my_instance;
}
/**