From 9b2d382232cbd20c9c5ca6b7f7edf8fb7a69c428 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sun, 8 Jan 2017 14:50:54 +0200 Subject: [PATCH] Use module parameters in mqfilter Cleaned up mqfilter createInstance and converted it to use module parameters. --- include/maxscale/config.h | 16 + server/core/config.c | 14 + server/modules/filter/mqfilter/mqfilter.c | 338 ++++++---------------- 3 files changed, 112 insertions(+), 256 deletions(-) diff --git a/include/maxscale/config.h b/include/maxscale/config.h index b0fd70930..aa3eeda58 100644 --- a/include/maxscale/config.h +++ b/include/maxscale/config.h @@ -270,6 +270,22 @@ const char* config_get_string(const CONFIG_PARAMETER *params, const char *key); */ int config_get_enum(const CONFIG_PARAMETER *params, const char *key, const MXS_ENUM_VALUE *values); +/** + * @brief Get copy of parameter value if it is defined + * + * If a parameter with the name of @c key is defined in @c params, a copy of the + * value of that parameter is returned. The caller must free the returned string. + * + * @param params List of configuration parameters + * @param key Parameter name + * + * @return Pointer to copy of value or NULL if the parameter was not found + * + * @note The use of this function should be avoided after startup as the function + * will abort the process if memory allocation fails. + */ +char* config_copy_string(const CONFIG_PARAMETER *params, const char *key); + /** * @brief Generate default module parameters * diff --git a/server/core/config.c b/server/core/config.c index a1d92b620..007b7dfea 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -985,6 +985,20 @@ int config_get_enum(const CONFIG_PARAMETER *params, const char *key, const MXS_E return found ? rv : -1; } +char* config_copy_string(const CONFIG_PARAMETER *params, const char *key) +{ + const char *value = config_get_value_string(params, key); + + char *rval = NULL; + + if (*value) + { + rval = MXS_STRDUP_A(value); + } + + return rval; +} + CONFIG_PARAMETER* config_clone_param(const CONFIG_PARAMETER* param) { CONFIG_PARAMETER *p2 = MXS_MALLOC(sizeof(CONFIG_PARAMETER)); diff --git a/server/modules/filter/mqfilter/mqfilter.c b/server/modules/filter/mqfilter/mqfilter.c index c273c37ac..25e61c534 100644 --- a/server/modules/filter/mqfilter/mqfilter.c +++ b/server/modules/filter/mqfilter/mqfilter.c @@ -237,6 +237,15 @@ typedef struct void sendMessage(void* data); +static const MXS_ENUM_VALUE trigger_values[] = +{ + {"source", TRG_SOURCE}, + {"schema", TRG_SCHEMA}, + {"object", TRG_OBJECT}, + {"all", TRG_ALL}, + {NULL} +}; + /** * The module entry point routine. It is this routine that * must populate the structure that is referred to as the @@ -275,6 +284,25 @@ MXS_MODULE* MXS_CREATE_MODULE() NULL, /* Thread init. */ NULL, /* Thread finish. */ { + {"hostname", MXS_MODULE_PARAM_STRING, "localhost"}, + {"username", MXS_MODULE_PARAM_STRING, "guest"}, + {"password", MXS_MODULE_PARAM_STRING, "guest"}, + {"vhost", MXS_MODULE_PARAM_STRING, "/"}, + {"port", MXS_MODULE_PARAM_COUNT, "5672"}, + {"exchange", MXS_MODULE_PARAM_STRING, "default_exchange"}, + {"key", MXS_MODULE_PARAM_STRING, "key"}, + {"queue", MXS_MODULE_PARAM_STRING}, + {"ssl_client_certificate", MXS_MODULE_PARAM_PATH, NULL, MXS_MODULE_OPT_PATH_R_OK}, + {"ssl_client_key", MXS_MODULE_PARAM_PATH, NULL, MXS_MODULE_OPT_PATH_R_OK}, + {"ssl_CA_cert", MXS_MODULE_PARAM_PATH, NULL, MXS_MODULE_OPT_PATH_R_OK}, + {"exchange_type", MXS_MODULE_PARAM_STRING, "direct"}, + {"logging_trigger", MXS_MODULE_PARAM_ENUM, "all", MXS_MODULE_OPT_NONE, trigger_values}, + {"logging_source_user", MXS_MODULE_PARAM_STRING}, + {"logging_source_host", MXS_MODULE_PARAM_STRING}, + {"logging_schema", MXS_MODULE_PARAM_STRING}, + {"logging_object", MXS_MODULE_PARAM_STRING}, + {"logging_log_all", MXS_MODULE_PARAM_BOOL, "false"}, + {"logging_strict", MXS_MODULE_PARAM_BOOL, "true"}, {MXS_END_MODULE_PARAMS} } }; @@ -449,10 +477,11 @@ cleanup: * @param szstore Address where to store the size of the array after parsing * @return The array containing the parsed string */ -char** parse_optstr(char* str, char* tok, int* szstore) +char** parse_optstr(const char* str, const char* tok, int* szstore) { - char *lasts, *tk = str; - char **arr; + char tmp[strlen(str) + 1]; + strcpy(tmp, str); + char *lasts, *tk = tmp; int i = 0, size = 1; @@ -461,21 +490,18 @@ char** parse_optstr(char* str, char* tok, int* szstore) size++; } - arr = MXS_MALLOC(sizeof(char*)*size); - - if (arr == NULL) - { - *szstore = 0; - return NULL; - } + char **arr = MXS_MALLOC(sizeof(char*) * size); + MXS_ABORT_IF_NULL(arr); *szstore = size; - tk = strtok_r(str, tok, &lasts); + tk = strtok_r(tmp, tok, &lasts); + while (tk && i < size) { arr[i++] = MXS_STRDUP_A(tk); tk = strtok_r(NULL, tok, &lasts); } + return arr; } @@ -492,285 +518,91 @@ char** parse_optstr(char* str, char* tok, int* szstore) static FILTER * createInstance(const char *name, char **options, CONFIG_PARAMETER *params) { - MQ_INSTANCE *my_instance; - int paramcount = 0, parammax = 64, i = 0, x = 0, arrsize = 0; - CONFIG_PARAMETER** paramlist; - char** arr = NULL; - char taskname[512]; + MQ_INSTANCE *my_instance = MXS_CALLOC(1, sizeof(MQ_INSTANCE)); - if ((my_instance = MXS_CALLOC(1, sizeof(MQ_INSTANCE)))) + if (my_instance) { spinlock_init(&my_instance->rconn_lock); spinlock_init(&my_instance->msg_lock); uid_gen = 0; - paramlist = MXS_MALLOC(sizeof(CONFIG_PARAMETER*) * 64); - MXS_ABORT_IF_NULL(paramlist); if ((my_instance->conn = amqp_new_connection()) == NULL) { - MXS_FREE(paramlist); MXS_FREE(my_instance); return NULL; } + my_instance->channel = 1; my_instance->last_rconn = time(NULL); my_instance->conn_stat = AMQP_STATUS_OK; my_instance->rconn_intv = 1; - my_instance->port = 5672; - my_instance->trgtype = TRG_ALL; - my_instance->log_all = false; - my_instance->strict_logging = true; - for (const CONFIG_PARAMETER *p = p; p; p = p->next) - { - if (!strcmp(p->name, "hostname")) - { - my_instance->hostname = MXS_STRDUP_A(p->value); - } - else if (!strcmp(p->name, "username")) - { - my_instance->username = MXS_STRDUP_A(p->value); - } - else if (!strcmp(p->name, "password")) - { - my_instance->password = MXS_STRDUP_A(p->value); - } - else if (!strcmp(p->name, "vhost")) - { - my_instance->vhost = MXS_STRDUP_A(p->value); - } - else if (!strcmp(p->name, "port")) - { - my_instance->port = atoi(p->value); - } - else if (!strcmp(p->name, "exchange")) - { - my_instance->exchange = MXS_STRDUP_A(p->value); - } - else if (!strcmp(p->name, "key")) - { - my_instance->key = MXS_STRDUP_A(p->value); - } - else if (!strcmp(p->name, "queue")) - { - my_instance->queue = MXS_STRDUP_A(p->value); - } - else if (!strcmp(p->name, "ssl_client_certificate")) - { - my_instance->ssl_client_cert = MXS_STRDUP_A(p->value); - } - else if (!strcmp(p->name, "ssl_client_key")) - { - - my_instance->ssl_client_key = MXS_STRDUP_A(p->value); - } - else if (!strcmp(p->name, "ssl_CA_cert")) - { - - my_instance->ssl_CA_cert = MXS_STRDUP_A(p->value); - } - else if (!strcmp(p->name, "exchange_type")) - { - - my_instance->exchange_type = MXS_STRDUP_A(p->value); - } - else if (!strcmp(p->name, "logging_trigger")) - { - - arr = parse_optstr(p->value, ",", &arrsize); - - for (x = 0; x < arrsize; x++) - { - if (!strcmp(arr[x], "source")) - { - my_instance->trgtype |= TRG_SOURCE; - } - else if (!strcmp(arr[x], "schema")) - { - my_instance->trgtype |= TRG_SCHEMA; - } - else if (!strcmp(arr[x], "object")) - { - my_instance->trgtype |= TRG_OBJECT; - } - else if (!strcmp(arr[x], "all")) - { - my_instance->trgtype = TRG_ALL; - } - else - { - MXS_ERROR("Unknown option for 'logging_trigger':%s.", arr[x]); - } - } - - if (arrsize > 0) - { - for (int x = 0; x < arrsize; x++) - { - MXS_FREE(arr[x]); - } - MXS_FREE(arr); - arr = NULL; - } - arrsize = 0; - - - - } - else if (strstr(p->name, "logging_")) - { - - if (paramcount < parammax) - { - paramlist[paramcount] = MXS_MALLOC(sizeof(CONFIG_PARAMETER)); - MXS_ABORT_IF_NULL(paramlist[paramcount]); - paramlist[paramcount]->name = MXS_STRDUP_A(p->name); - paramlist[paramcount]->value = MXS_STRDUP_A(p->value); - paramcount++; - } - } - } + my_instance->port = config_get_integer(params, "port"); + my_instance->trgtype = config_get_enum(params, "logging_trigger", trigger_values); + my_instance->log_all = config_get_bool(params, "logging_log_all"); + my_instance->strict_logging = config_get_bool(params, "logging_strict"); + my_instance->hostname = MXS_STRDUP_A(config_get_string(params, "hostname")); + my_instance->username = MXS_STRDUP_A(config_get_string(params, "username")); + my_instance->password = MXS_STRDUP_A(config_get_string(params, "password")); + my_instance->vhost = MXS_STRDUP_A(config_get_string(params, "vhost")); + my_instance->exchange = MXS_STRDUP_A(config_get_string(params, "exchange")); + my_instance->key = MXS_STRDUP_A(config_get_string(params, "key")); + my_instance->exchange_type = MXS_STRDUP_A(config_get_string(params, "exchange_type")); + my_instance->queue = config_copy_string(params, "queue"); + my_instance->ssl_client_cert = config_copy_string(params, "ssl_client_certificate"); + my_instance->ssl_client_key = config_copy_string(params, "ssl_client_key"); + my_instance->ssl_CA_cert = config_copy_string(params, "ssl_CA_cert"); if (my_instance->trgtype & TRG_SOURCE) { - - my_instance->src_trg = (SRC_TRIG*) MXS_MALLOC(sizeof(SRC_TRIG)); + my_instance->src_trg = (SRC_TRIG*)MXS_CALLOC(1, sizeof(SRC_TRIG)); MXS_ABORT_IF_NULL(my_instance->src_trg); - my_instance->src_trg->user = NULL; - my_instance->src_trg->host = NULL; - my_instance->src_trg->usize = 0; - my_instance->src_trg->hsize = 0; - } if (my_instance->trgtype & TRG_SCHEMA) { - - my_instance->shm_trg = (SHM_TRIG*) MXS_MALLOC(sizeof(SHM_TRIG)); + my_instance->shm_trg = (SHM_TRIG*)MXS_CALLOC(1, sizeof(SHM_TRIG)); MXS_ABORT_IF_NULL(my_instance->shm_trg); - my_instance->shm_trg->objects = NULL; - my_instance->shm_trg->size = 0; - } if (my_instance->trgtype & TRG_OBJECT) { - - my_instance->obj_trg = (OBJ_TRIG*) MXS_MALLOC(sizeof(OBJ_TRIG)); + my_instance->obj_trg = (OBJ_TRIG*)MXS_CALLOC(1, sizeof(OBJ_TRIG)); MXS_ABORT_IF_NULL(my_instance->obj_trg); - my_instance->obj_trg->objects = NULL; - my_instance->obj_trg->size = 0; - } - for (i = 0; i < paramcount; i++) + CONFIG_PARAMETER *p = config_get_param(params, "logging_source_user"); + + if (p && my_instance->src_trg) { - - if (!strcmp(paramlist[i]->name, "logging_source_user")) - { - - if (my_instance->src_trg) - { - my_instance->src_trg->user = parse_optstr(paramlist[i]->value, ",", &arrsize); - my_instance->src_trg->usize = arrsize; - arrsize = 0; - } - - } - else if (!strcmp(paramlist[i]->name, "logging_source_host")) - { - - if (my_instance->src_trg) - { - my_instance->src_trg->host = parse_optstr(paramlist[i]->value, ",", &arrsize); - my_instance->src_trg->hsize = arrsize; - arrsize = 0; - } - - } - else if (!strcmp(paramlist[i]->name, "logging_schema")) - { - - if (my_instance->shm_trg) - { - my_instance->shm_trg->objects = parse_optstr(paramlist[i]->value, ",", &arrsize); - my_instance->shm_trg->size = arrsize; - arrsize = 0; - } - - } - else if (!strcmp(paramlist[i]->name, "logging_object")) - { - - if (my_instance->obj_trg) - { - my_instance->obj_trg->objects = parse_optstr(paramlist[i]->value, ",", &arrsize); - my_instance->obj_trg->size = arrsize; - arrsize = 0; - } - - } - else if (!strcmp(paramlist[i]->name, "logging_log_all")) - { - if (config_truth_value(paramlist[i]->value)) - { - my_instance->log_all = true; - } - } - else if (!strcmp(paramlist[i]->name, "logging_strict")) - { - if (!config_truth_value(paramlist[i]->value)) - { - my_instance->strict_logging = false; - } - } - MXS_FREE(paramlist[i]->name); - MXS_FREE(paramlist[i]->value); - MXS_FREE(paramlist[i]); + my_instance->src_trg->user = parse_optstr(p->value, ",", &my_instance->src_trg->usize); } - MXS_FREE(paramlist); + p = config_get_param(params, "logging_source_host"); - if (my_instance->hostname == NULL) + if (p && my_instance->src_trg) { - my_instance->hostname = MXS_STRDUP_A("localhost"); - } - if (my_instance->username == NULL) - { - my_instance->username = MXS_STRDUP_A("guest"); - } - if (my_instance->password == NULL) - { - my_instance->password = MXS_STRDUP_A("guest"); - } - if (my_instance->vhost == NULL) - { - my_instance->vhost = MXS_STRDUP_A("/"); - } - if (my_instance->exchange == NULL) - { - my_instance->exchange = MXS_STRDUP_A("default_exchange"); - } - if (my_instance->key == NULL) - { - my_instance->key = MXS_STRDUP_A("key"); - } - if (my_instance->exchange_type == NULL) - { - my_instance->exchange_type = MXS_STRDUP_A("direct"); + my_instance->src_trg->host = parse_optstr(p->value, ",", &my_instance->src_trg->hsize); } - if (my_instance->ssl_client_cert != NULL && - my_instance->ssl_client_key != NULL && - my_instance->ssl_CA_cert != NULL) + p = config_get_param(params, "logging_schema"); + + if (p && my_instance->shm_trg) { - my_instance->use_ssl = true; + my_instance->shm_trg->objects = parse_optstr(p->value, ",", &my_instance->shm_trg->size); } - else + + p = config_get_param(params, "logging_object"); + + if (p && my_instance->obj_trg) { - my_instance->use_ssl = false; + my_instance->obj_trg->objects = parse_optstr(p->value, ",", &my_instance->obj_trg->size); } + my_instance->use_ssl = my_instance->ssl_client_cert && + my_instance->ssl_client_key && + my_instance->ssl_CA_cert; + if (my_instance->use_ssl) { /**Assume the underlying SSL library is already initialized*/ @@ -780,18 +612,12 @@ createInstance(const char *name, char **options, CONFIG_PARAMETER *params) /**Connect to the server*/ init_conn(my_instance); + char taskname[512]; snprintf(taskname, 511, "mqtask%d", atomic_add(&hktask_id, 1)); - hktask_add(taskname, sendMessage, (void*) my_instance, 5); - if (arr) - { - for (int x = 0; x < arrsize; x++) - { - MXS_FREE(arr[x]); - } - MXS_FREE(arr); - } + hktask_add(taskname, sendMessage, (void*)my_instance, 5); } - return (FILTER *) my_instance; + + return (FILTER *)my_instance; } /**