From c4999232ce31ba4e0607cf2e04ce3b93f8cc2d95 Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Mon, 14 Nov 2016 16:17:31 +0200 Subject: [PATCH] MXS-935: Implement column matching With the advent of qc_get_field_info, columns can now be matched. However, there is still some undeterminism caused by the table information not containing contextual information (exactly where is the table used). Further, suppose table X contains the column A and table Y contains the column B, then given a statement like SELECT a, b from X, Z; we cannot know whether a is in X or Z, or b in X or Z, without being aware of the schema, which we currently are not. Consequently, as long as MaxScale is not aware of the schema, some heuristics must be applied. For instance, if exactly one table is referred to, then we can assume that columns that are not explicitly qualified are from that table. The rule tests are currently rather rudimentary and need to be expanded. --- Documentation/Filters/Cache.md | 38 +- server/modules/filter/cache/rules.c | 887 +++++++++++++++---- server/modules/filter/cache/rules.h | 10 +- server/modules/filter/cache/test/testrules.c | 45 +- 4 files changed, 827 insertions(+), 153 deletions(-) diff --git a/Documentation/Filters/Cache.md b/Documentation/Filters/Cache.md index 53372dcfc..8679f58c3 100644 --- a/Documentation/Filters/Cache.md +++ b/Documentation/Filters/Cache.md @@ -173,8 +173,11 @@ where, * the _op_ can be `=`, `!=`, `like` or `unlike`, and * the _value_ a string. -If _op_ is `=` or `!=` then _value_ is used verbatim; if it is `like` +If _op_ is `=` or `!=` then _value_ is used as a string; if it is `like` or `unlike`, then _value_ is interpreted as a _pcre2_ regular expression. +Note though that if _attribute_ is `database`, `table` or `column`, then +the string is interpreted as a name, where a dot `.` denotes qualification +or scoping. The objects in the `store` array are processed in order. If the result of a comparison is _true_, no further processing will be made and the @@ -206,6 +209,39 @@ select * from tbl where b = 3 and a = 2; as well. Although they conceptually are identical, there will be two cache entries. +### Qualified Names + +When using `=` or `!=` in the rule object in conjunction with `database`, +`table` and `column`, the provided string is interpreted as a name, that is, +dot (`.`) denotes qualification or scope. + +In practice that means that if _attribute_ is `database` then _value_ may +not contain a dot, if _attribute_ is `table` then _value_ may contain one +dot, used for separating the database and table names respectively, and +if _attribute_ is `column` then _value_ may contain one or two dots, used +for separating table and column names, or database, table and column names. + +Note that if a qualified name is used as a _value_, then all parts of the +name must be available for a match. Currently Maria DB MaxScale may not +always be capable of deducing in what table a particular column is. If +that is the case, then a value like `tbl.field` may not necessarily +be a match even if the field is `field` and the table actually is `tbl`. + +### Implication of the _default_ database. + +If the rules concerns the `database`, then only if the statement refers +to *no* specific database, will the default database be considered. + +### Regexp Matching + +The string used for matching the regular expression contains as much +information as there is available. For instance, in a situation like +``` +use somedb; +select fld from tbl; +``` +the string matched against the regular expression will be `somedb.tbl.fld`. + ### Examples Cache all queries targeting a particular database. diff --git a/server/modules/filter/cache/rules.c b/server/modules/filter/cache/rules.c index 7dd5bbdf0..c4917aaa5 100644 --- a/server/modules/filter/cache/rules.c +++ b/server/modules/filter/cache/rules.c @@ -76,10 +76,28 @@ static CACHE_RULE *cache_rule_create_simple(cache_rule_attribute_t attribute, cache_rule_op_t op, const char *value, uint32_t debug); +static CACHE_RULE *cache_rule_create_simple_ctd(cache_rule_attribute_t attribute, + cache_rule_op_t op, + const char *cvalue, + uint32_t debug); +static CACHE_RULE *cache_rule_create_simple_user(cache_rule_attribute_t attribute, + cache_rule_op_t op, + const char *cvalue, + uint32_t debug); +static CACHE_RULE *cache_rule_create_simple_query(cache_rule_attribute_t attribute, + cache_rule_op_t op, + const char *cvalue, + uint32_t debug); static CACHE_RULE *cache_rule_create(cache_rule_attribute_t attribute, cache_rule_op_t op, const char *value, uint32_t debug); +static bool cache_rule_matches_column_regexp(CACHE_RULE *rule, + const char *default_db, + const GWBUF *query); +static bool cache_rule_matches_column_simple(CACHE_RULE *rule, + const char *default_db, + const GWBUF *query); static bool cache_rule_matches_column(CACHE_RULE *rule, const char *default_db, const GWBUF *query); @@ -92,6 +110,12 @@ static bool cache_rule_matches_query(CACHE_RULE *rule, static bool cache_rule_matches_table(CACHE_RULE *rule, const char *default_db, const GWBUF *query); +static bool cache_rule_matches_table_regexp(CACHE_RULE *rule, + const char *default_db, + const GWBUF *query); +static bool cache_rule_matches_table_simple(CACHE_RULE *rule, + const char *default_db, + const GWBUF *query); static bool cache_rule_matches_user(CACHE_RULE *rule, const char *user); static bool cache_rule_matches(CACHE_RULE *rule, const char *default_db, @@ -366,7 +390,6 @@ bool cache_rules_should_use(CACHE_RULES *self, const SESSION *session) while (rule && !should_use) { should_use = cache_rule_matches_user(rule, account); - rule = rule->next; } } @@ -515,11 +538,22 @@ static CACHE_RULE *cache_rule_create_regexp(cache_rule_attribute_t attribute, return rule; } -static CACHE_RULE *cache_rule_create_user(cache_rule_attribute_t attribute, - cache_rule_op_t op, - const char *cvalue, - uint32_t debug) +/** + * Creates a CACHE_RULE object matching users. + * + * @param attribute CACHE_ATTRIBUTE_USER + * @param op An operator, CACHE_OP_EQ or CACHE_OP_NEQ. + * @param cvalue A string in the MySQL user format. + * @param debug The debug level. + * + * @return A new rule object or NULL in case of failure. + */ +static CACHE_RULE *cache_rule_create_simple_user(cache_rule_attribute_t attribute, + cache_rule_op_t op, + const char *cvalue, + uint32_t debug) { + ss_dassert(attribute == CACHE_ATTRIBUTE_USER); ss_dassert((op == CACHE_OP_EQ) || (op == CACHE_OP_NEQ)); CACHE_RULE *rule = NULL; @@ -608,6 +642,211 @@ static CACHE_RULE *cache_rule_create_user(cache_rule_attribute_t attribute, return rule; } +/** + * Creates a CACHE_RULE object matching column/table/database. + * + * @param attribute CACHE_ATTRIBUTE_[COLUMN|TABLE|DATABASE] + * @param op An operator, CACHE_OP_EQ or CACHE_OP_NEQ. + * @param cvalue A name, with 0, 1 or 2 dots. + * @param debug The debug level. + * + * @return A new rule object or NULL in case of failure. + */ +static CACHE_RULE *cache_rule_create_simple_ctd(cache_rule_attribute_t attribute, + cache_rule_op_t op, + const char *cvalue, + uint32_t debug) +{ + ss_dassert((attribute == CACHE_ATTRIBUTE_COLUMN) || + (attribute == CACHE_ATTRIBUTE_TABLE) || + (attribute == CACHE_ATTRIBUTE_DATABASE)); + ss_dassert((op == CACHE_OP_EQ) || (op == CACHE_OP_NEQ)); + + CACHE_RULE *rule = (CACHE_RULE*)MXS_CALLOC(1, sizeof(CACHE_RULE)); + char *value = MXS_STRDUP(cvalue); + + if (rule && value) + { + rule->attribute = attribute; + rule->op = op; + rule->value = value; + rule->debug = debug; + + bool allocation_failed = false; + + char buffer[strlen(value) + 1]; + strcpy(buffer, value); + + const char* first = NULL; + const char* second = NULL; + const char* third = NULL; + char* dot1 = strchr(buffer, '.'); + char* dot2 = dot1 ? strchr(dot1 + 1, '.') : NULL; + + if (dot1 && dot2) + { + first = buffer; + *dot1 = 0; + second = dot1 + 1; + *dot2 = 0; + third = dot2 + 1; + } + else if (dot1) + { + first = buffer; + *dot1 = 0; + second = dot1 + 1; + } + else + { + first = buffer; + } + + switch (attribute) + { + case CACHE_ATTRIBUTE_COLUMN: + { + if (third) // implies also 'first' and 'second' + { + rule->simple.column = MXS_STRDUP(third); + rule->simple.table = MXS_STRDUP(second); + rule->simple.database = MXS_STRDUP(first); + + if (!rule->simple.column || !rule->simple.table || !rule->simple.database) + { + allocation_failed = true; + } + } + else if (second) // implies also 'first' + { + rule->simple.column = MXS_STRDUP(second); + rule->simple.table = MXS_STRDUP(first); + + if (!rule->simple.column || !rule->simple.table) + { + allocation_failed = true; + } + } + else // only 'first' + { + rule->simple.column = MXS_STRDUP(first); + + if (!rule->simple.column) + { + allocation_failed = true; + } + } + } + break; + + case CACHE_ATTRIBUTE_TABLE: + if (third) + { + MXS_ERROR("A cache rule value for matching a table name, cannot contain two dots: '%s'", + cvalue); + allocation_failed = true; + } + else + { + if (second) // implies also 'first' + { + rule->simple.database = MXS_STRDUP(first); + rule->simple.table = MXS_STRDUP(second); + if (!rule->simple.database || !rule->simple.table) + { + allocation_failed = true; + } + } + else // only 'first' + { + rule->simple.table = MXS_STRDUP(first); + if (!rule->simple.table) + { + allocation_failed = true; + } + } + } + break; + + case CACHE_ATTRIBUTE_DATABASE: + if (second) + { + MXS_ERROR("A cache rule value for matching a database, cannot contain a dot: '%s'", + cvalue); + allocation_failed = true; + } + else + { + rule->simple.database = MXS_STRDUP(first); + if (!rule->simple.database) + { + allocation_failed = true; + } + } + break; + + default: + ss_dassert(!true); + } + + if (allocation_failed) + { + MXS_FREE(rule->simple.column); + MXS_FREE(rule->simple.table); + MXS_FREE(rule->simple.database); + MXS_FREE(value); + MXS_FREE(rule); + rule = NULL; + } + } + else + { + MXS_FREE(value); + MXS_FREE(rule); + rule = NULL; + } + + return rule; +} + +/** + * Creates a CACHE_RULE object matching an entire query. + * + * @param attribute CACHE_ATTRIBUTE_QUERY. + * @param op An operator, CACHE_OP_EQ or CACHE_OP_NEQ. + * @param cvalue A string. + * @param debug The debug level. + * + * @return A new rule object or NULL in case of failure. + */ +static CACHE_RULE *cache_rule_create_simple_query(cache_rule_attribute_t attribute, + cache_rule_op_t op, + const char *cvalue, + uint32_t debug) +{ + ss_dassert(attribute == CACHE_ATTRIBUTE_QUERY); + ss_dassert((op == CACHE_OP_EQ) || (op == CACHE_OP_NEQ)); + + CACHE_RULE *rule = MXS_CALLOC(1, sizeof(CACHE_RULE)); + char *value = MXS_STRDUP(cvalue); + + if (rule && value) + { + rule->attribute = attribute; + rule->op = op; + rule->debug = debug; + rule->value = value; + } + else + { + MXS_FREE(value); + MXS_FREE(rule); + rule = NULL; + } + + return rule; +} + /** * Creates a CACHE_RULE object doing simple matching. * @@ -627,28 +866,25 @@ static CACHE_RULE *cache_rule_create_simple(cache_rule_attribute_t attribute, CACHE_RULE *rule; - if (attribute == CACHE_ATTRIBUTE_USER) + switch (attribute) { - rule = cache_rule_create_user(attribute, op, cvalue, debug); - } - else - { - rule = (CACHE_RULE*)MXS_CALLOC(1, sizeof(CACHE_RULE)); - char *value = MXS_STRDUP(cvalue); + case CACHE_ATTRIBUTE_COLUMN: + case CACHE_ATTRIBUTE_TABLE: + case CACHE_ATTRIBUTE_DATABASE: + rule = cache_rule_create_simple_ctd(attribute, op, cvalue, debug); + break; - if (rule && value) - { - rule->attribute = attribute; - rule->op = op; - rule->debug = debug; - rule->value = value; - } - else - { - MXS_FREE(rule); - MXS_FREE(value); - rule = NULL; - } + case CACHE_ATTRIBUTE_USER: + rule = cache_rule_create_simple_user(attribute, op, cvalue, debug); + break; + + case CACHE_ATTRIBUTE_QUERY: + rule = cache_rule_create_simple_query(attribute, op, cvalue, debug); + break; + + default: + MXS_ERROR("Unknown attribute type: %d", (int)attribute); + ss_dassert(!true); } return rule; @@ -708,7 +944,13 @@ static void cache_rule_free(CACHE_RULE* rule) MXS_FREE(rule->value); - if ((rule->op == CACHE_OP_LIKE) || (rule->op == CACHE_OP_UNLIKE)) + if ((rule->op == CACHE_OP_EQ) || (rule->op == CACHE_OP_NEQ)) + { + MXS_FREE(rule->simple.column); + MXS_FREE(rule->simple.table); + MXS_FREE(rule->simple.database); + } + else if ((rule->op == CACHE_OP_LIKE) || (rule->op == CACHE_OP_UNLIKE)) { pcre2_match_data_free(rule->regexp.data); pcre2_code_free(rule->regexp.code); @@ -728,7 +970,25 @@ static void cache_rule_free(CACHE_RULE* rule) */ static bool cache_rule_compare(CACHE_RULE *self, const char *value) { - return cache_rule_compare_n(self, value, strlen(value)); + bool rv; + + if (value) + { + rv = cache_rule_compare_n(self, value, strlen(value)); + } + else + { + if ((self->op == CACHE_OP_EQ) || (self->op == CACHE_OP_LIKE)) + { + rv = false; + } + else + { + rv = true; + } + } + + return rv; } /** @@ -779,44 +1039,35 @@ static bool cache_rule_compare_n(CACHE_RULE *self, const char *value, size_t len * * @return True, if the rule matches, false otherwise. */ -static bool cache_rule_matches_column(CACHE_RULE *self, const char *default_db, const GWBUF *query) +static bool cache_rule_matches_column_regexp(CACHE_RULE *self, const char *default_db, const GWBUF *query) { ss_dassert(self->attribute == CACHE_ATTRIBUTE_COLUMN); + ss_dassert((self->op == CACHE_OP_LIKE) || (self->op == CACHE_OP_UNLIKE)); - // TODO: Do this "parsing" when the rule item is created. - char buffer[strlen(self->value) + 1]; - strcpy(buffer, self->value); + const char* default_database = NULL; - const char* rule_column = NULL; - const char* rule_table = NULL; - const char* rule_database = NULL; - char* dot1 = strchr(buffer, '.'); - char* dot2 = dot1 ? strchr(buffer, '.') : NULL; + int n_databases; + char **databases = qc_get_database_names((GWBUF*)query, &n_databases); - if (dot1 && dot2) + if (n_databases == 0) { - rule_database = buffer; - *dot1 = 0; - rule_table = dot1 + 1; - *dot2 = 0; - rule_column = dot2 + 1; + // If no databases have been mentioned, then we can assume that all + // tables and columns that are not explcitly qualified refer to the + // default database. + default_database = default_db; } - else if (dot1) + else if ((default_db == NULL) && (n_databases == 1)) { - rule_table = buffer; - *dot1 = 0; - rule_column = dot1 + 1; - } - else - { - rule_column = buffer; + // If there is no default database and exactly one database has been + // explicitly mentioned, then we can assume all tables and columns that + // are not explicitly qualified refer to that database. + default_database = databases[0]; } - const QC_FIELD_INFO *infos; - size_t n_infos; + size_t default_database_len = default_database ? strlen(default_database) : 0; int n_tables; - char** tables = qc_get_table_names((GWBUF*)query, &n_tables, false); + char **tables = qc_get_table_names((GWBUF*)query, &n_tables, false); const char* default_table = NULL; @@ -827,6 +1078,11 @@ static bool cache_rule_matches_column(CACHE_RULE *self, const char *default_db, default_table = tables[0]; } + size_t default_table_len = default_table ? strlen(default_table) : 0; + + const QC_FIELD_INFO *infos; + size_t n_infos; + qc_get_field_info((GWBUF*)query, &infos, &n_infos); bool matches = false; @@ -836,49 +1092,214 @@ static bool cache_rule_matches_column(CACHE_RULE *self, const char *default_db, { const QC_FIELD_INFO *info = (infos + i); - if ((strcmp(info->column, rule_column) == 0) || (strcmp(info->column, "*") == 0)) + if (info->usage & QC_USED_IN_SELECT) { - if (rule_table) + size_t database_len; + const char *database; + + if (info->database) { - const char* check_table = info->table ? info->table : default_table; + database = info->database; + database_len = strlen(info->database); + } + else + { + database = default_database; + database_len = default_database_len; + } - if (check_table && (strcmp(check_table, rule_table) == 0)) + size_t table_len; + const char *table; + + if (info->table) + { + table = info->table; + table_len = strlen(info->table); + } + else + { + table = default_table; + table_len = default_table_len; + } + + char buffer[database_len + 1 + table_len + strlen(info->column) + 1]; + buffer[0] = 0; + + if (database) + { + strcat(buffer, database); + strcat(buffer, "."); + } + + if (table) + { + strcat(buffer, table); + strcat(buffer, "."); + } + + strcat(buffer, info->column); + + matches = cache_rule_compare(self, buffer); + } + + ++i; + } + + if (tables) + { + for (i = 0; i < (size_t)n_tables; ++i) + { + MXS_FREE(tables[i]); + } + MXS_FREE(tables); + } + + if (databases) + { + for (i = 0; i < (size_t)n_databases; ++i) + { + MXS_FREE(databases[i]); + } + MXS_FREE(databases); + } + + return matches; +} + +/** + * Returns boolean indicating whether the column rule matches the query or not. + * + * @param self The CACHE_RULE object. + * @param default_db The current default db. + * @param query The query. + * + * @return True, if the rule matches, false otherwise. + */ +static bool cache_rule_matches_column_simple(CACHE_RULE *self, const char *default_db, const GWBUF *query) +{ + ss_dassert(self->attribute == CACHE_ATTRIBUTE_COLUMN); + ss_dassert((self->op == CACHE_OP_EQ) || (self->op == CACHE_OP_NEQ)); + + const char* rule_column = self->simple.column; + const char* rule_table = self->simple.table; + const char* rule_database = self->simple.database; + + const char* default_database = NULL; + + int n_databases; + char **databases = qc_get_database_names((GWBUF*)query, &n_databases); + + if (n_databases == 0) + { + // If no databases have been mentioned, then we can assume that all + // tables and columns that are not explcitly qualified refer to the + // default database. + default_database = default_db; + } + else if ((default_db == NULL) && (n_databases == 1)) + { + // If there is no default database and exactly one database has been + // explicitly mentioned, then we can assume all tables and columns that + // are not explicitly qualified refer to that database. + default_database = databases[0]; + } + + int n_tables; + char **tables = qc_get_table_names((GWBUF*)query, &n_tables, false); + + const char* default_table = NULL; + + if (n_tables == 1) + { + // Only if we have exactly one table can we assume anything + // about a table that has not been mentioned explicitly. + default_table = tables[0]; + } + + const QC_FIELD_INFO *infos; + size_t n_infos; + + qc_get_field_info((GWBUF*)query, &infos, &n_infos); + + bool matches = false; + + size_t i = 0; + while (!matches && (i < n_infos)) + { + const QC_FIELD_INFO *info = (infos + i); + + if (info->usage & QC_USED_IN_SELECT) + { + if ((strcasecmp(info->column, rule_column) == 0) || strcmp(rule_column, "*") == 0) + { + if (rule_table) { - if (rule_database) - { - const char *check_database = info->database ? info->database : default_db; + const char* check_table = info->table ? info->table : default_table; - if (check_database && (strcmp(check_database, rule_database) == 0)) + if (check_table) + { + if (strcasecmp(check_table, rule_table) == 0) { - matches = true; + if (rule_database) + { + const char *check_database = + info->database ? info->database : default_database; + + if (check_database) + { + if (strcasecmp(check_database, rule_database) == 0) + { + // The column, table and database matched. + matches = true; + } + else + { + // The column, table matched but the database did not. + matches = false; + } + } + else + { + // If the rules specify a database but we do not know the database, + // we consider the databases not to match. + matches = false; + } + } + else + { + // If the rule specifies no database, then if the column and the table + // matches, the rule matches. + matches = true; + } } else { - // If the rules specifies a database and either the database - // does not match or we do not know the database, the rule - // does *not* match. + // The column matched, but the table did not. matches = false; } } else { - // If the rule specifies no table, then if the table and column matches, - // the rule matches. - matches = true; + // If the rules specify a table but we do not know the table, we + // consider the tables not to match. + matches = false; } } else { - // The rules specifies a table and either the table does not match - // or we do not know the table, the rule does *not* match. - matches = false; + // The column matched and there is no table rule. + matches = true; } } else { - // If the rule specifies no table, then if the column matches, the - // rule matches. - matches = true; + // The column did not match. + matches = false; + } + + if (self->op == CACHE_OP_NEQ) + { + matches = !matches; } } @@ -894,6 +1315,49 @@ static bool cache_rule_matches_column(CACHE_RULE *self, const char *default_db, MXS_FREE(tables); } + if (databases) + { + for (i = 0; i < (size_t)n_databases; ++i) + { + MXS_FREE(databases[i]); + } + MXS_FREE(databases); + } + + return matches; +} + +/** + * Returns boolean indicating whether the column rule matches the query or not. + * + * @param self The CACHE_RULE object. + * @param default_db The current default db. + * @param query The query. + * + * @return True, if the rule matches, false otherwise. + */ +static bool cache_rule_matches_column(CACHE_RULE *self, const char *default_db, const GWBUF *query) +{ + ss_dassert(self->attribute == CACHE_ATTRIBUTE_COLUMN); + + bool matches = false; + + switch (self->op) + { + case CACHE_OP_EQ: + case CACHE_OP_NEQ: + matches = cache_rule_matches_column_simple(self, default_db, query); + break; + + case CACHE_OP_LIKE: + case CACHE_OP_UNLIKE: + matches = cache_rule_matches_column_regexp(self, default_db, query); + break; + + default: + ss_dassert(!true); + } + return matches; } @@ -912,8 +1376,9 @@ static bool cache_rule_matches_database(CACHE_RULE *self, const char *default_db bool matches = false; + bool fullnames = true; int n; - char **names = qc_get_database_names((GWBUF*)query, &n); // TODO: Make qc const-correct. + char **names = qc_get_table_names((GWBUF*)query, &n, fullnames); // TODO: Make qc const-correct. if (names) { @@ -921,20 +1386,32 @@ static bool cache_rule_matches_database(CACHE_RULE *self, const char *default_db while (!matches && (i < n)) { - matches = cache_rule_compare(self, names[i]); + char *name = names[i]; + char *dot = strchr(name, '.'); + const char *database = NULL; + + if (dot) + { + *dot = 0; + database = name; + } + else + { + database = default_db; + } + + matches = cache_rule_compare(self, database); + + MXS_FREE(name); ++i; } - for (int i = 0; i < n; ++i) + while (i < n) { - MXS_FREE(names[i]); + MXS_FREE(names[i++]); } - MXS_FREE(names); - } - if (!matches && default_db) - { - matches = cache_rule_compare(self, default_db); + MXS_FREE(names); } return matches; @@ -962,6 +1439,174 @@ static bool cache_rule_matches_query(CACHE_RULE *self, const char *default_db, c return cache_rule_compare_n(self, sql, len); } +/** + * Returns boolean indicating whether the table regexp rule matches the query or not. + * + * @param self The CACHE_RULE object. + * @param default_db The current default db. + * @param query The query. + * + * @return True, if the rule matches, false otherwise. + */ +static bool cache_rule_matches_table_regexp(CACHE_RULE *self, const char *default_db, const GWBUF *query) +{ + ss_dassert(self->attribute == CACHE_ATTRIBUTE_TABLE); + ss_dassert((self->op == CACHE_OP_LIKE) || (self->op == CACHE_OP_UNLIKE)); + + bool matches = false; + + int n; + char **names; + bool fullnames; + + fullnames = true; + names = qc_get_table_names((GWBUF*)query, &n, fullnames); + + if (names) + { + size_t default_db_len = default_db ? strlen(default_db) : 0; + + int i = 0; + while (!matches && (i < n)) + { + char *name = names[i]; + char *dot = strchr(name, '.'); + + if (!dot) + { + // Only "tbl" + + if (default_db) + { + char buffer[default_db_len + 1 + strlen(name) + 1]; + + strcpy(name, default_db); + strcpy(name + default_db_len, "."); + strcpy(name + default_db_len + 1, name); + + matches = cache_rule_compare(self, name); + } + else + { + matches = cache_rule_compare(self, name); + } + + MXS_FREE(names[i]); + } + else + { + // A qualified name "db.tbl". + matches = cache_rule_compare(self, name); + } + + ++i; + } + + if (i < n) + { + MXS_FREE(names[i]); + ++i; + } + + MXS_FREE(names); + } + else if (self->op == CACHE_OP_UNLIKE) + { + matches = true; + } + + return matches; +} + +/** + * Returns boolean indicating whether the table simple rule matches the query or not. + * + * @param self The CACHE_RULE object. + * @param default_db The current default db. + * @param query The query. + * + * @return True, if the rule matches, false otherwise. + */ +static bool cache_rule_matches_table_simple(CACHE_RULE *self, const char *default_db, const GWBUF *query) +{ + ss_dassert(self->attribute == CACHE_ATTRIBUTE_TABLE); + ss_dassert((self->op == CACHE_OP_EQ) || (self->op == CACHE_OP_NEQ)); + + bool matches = false; + + bool fullnames = false; + + if (self->simple.database) + { + fullnames = true; + } + + int n; + char **names; + + names = qc_get_table_names((GWBUF*)query, &n, fullnames); + + if (names) + { + int i = 0; + while (!matches && (i < n)) + { + char *name = names[i]; + const char *database = NULL; + const char *table = NULL; + + if (fullnames) + { + char *dot = strchr(name, '.'); + + if (dot) + { + *dot = 0; + + database = name; + table = dot + 1; + } + else + { + database = default_db; + table = name; + } + + if (database) + { + matches = + (strcasecmp(self->simple.database, database) == 0) && + (strcasecmp(self->simple.table, table) == 0); + } + } + else + { + table = name; + + matches = (strcasecmp(self->simple.table, table) == 0); + } + + if (self->op == CACHE_OP_NEQ) + { + matches = !matches; + } + + MXS_FREE(name); + ++i; + } + + if (i < n) + { + MXS_FREE(names[i]); + ++i; + } + + MXS_FREE(names); + } + + return matches; +} + /** * Returns boolean indicating whether the table rule matches the query or not. * @@ -977,73 +1622,20 @@ static bool cache_rule_matches_table(CACHE_RULE *self, const char *default_db, c bool matches = false; - int n; - char **names; - bool fullnames; - - fullnames = false; - names = qc_get_table_names((GWBUF*)query, &n, fullnames); - - if (names) + switch (self->op) { - int i = 0; - while (!matches && (i < n)) - { - char *name = names[i]; - matches = cache_rule_compare(self, name); - MXS_FREE(name); - ++i; - } + case CACHE_OP_EQ: + case CACHE_OP_NEQ: + matches = cache_rule_matches_table_simple(self, default_db, query); + break; - if (i < n) - { - MXS_FREE(names[i]); - ++i; - } + case CACHE_OP_LIKE: + case CACHE_OP_UNLIKE: + matches = cache_rule_matches_table_regexp(self, default_db, query); + break; - MXS_FREE(names); - - if (!matches) - { - fullnames = true; - names = qc_get_table_names((GWBUF*)query, &n, fullnames); - - size_t default_db_len = default_db ? strlen(default_db) : 0; - i = 0; - - while (!matches && (i < n)) - { - char *name = names[i]; - char *dot = strchr(name, '.'); - - if (!dot) - { - if (default_db) - { - name = (char*)MXS_MALLOC(default_db_len + 1 + strlen(name) + 1); - - strcpy(name, default_db); - strcpy(name + default_db_len, "."); - strcpy(name + default_db_len + 1, names[i]); - - MXS_FREE(names[i]); - names[i] = name; - } - } - - matches = cache_rule_compare(self, name); - MXS_FREE(name); - ++i; - } - - if (i < n) - { - MXS_FREE(names[i]); - ++i; - } - - MXS_FREE(names); - } + default: + ss_dassert(!true); } return matches; @@ -1063,7 +1655,6 @@ static bool cache_rule_matches_user(CACHE_RULE *self, const char *account) bool matches = cache_rule_compare(self, account); - if ((matches && (self->debug & CACHE_DEBUG_MATCHING)) || (!matches && (self->debug & CACHE_DEBUG_NON_MATCHING))) { diff --git a/server/modules/filter/cache/rules.h b/server/modules/filter/cache/rules.h index 667ad344b..703c24d91 100644 --- a/server/modules/filter/cache/rules.h +++ b/server/modules/filter/cache/rules.h @@ -48,8 +48,14 @@ typedef struct cache_rule char *value; // The value from the rule file. struct { - pcre2_code* code; - pcre2_match_data* data; + char *database; + char *table; + char *column; + } simple; // Details, only for CACHE_OP_[EQ|NEQ] + struct + { + pcre2_code *code; + pcre2_match_data *data; } regexp; // Regexp data, only for CACHE_OP_[LIKE|UNLIKE]. uint32_t debug; // The debug level. struct cache_rule *next; diff --git a/server/modules/filter/cache/test/testrules.c b/server/modules/filter/cache/test/testrules.c index 153f342a5..601d0e2bc 100644 --- a/server/modules/filter/cache/test/testrules.c +++ b/server/modules/filter/cache/test/testrules.c @@ -132,8 +132,46 @@ struct store_test_case // false: The query should NOT match the rule. const struct store_test_case store_test_cases[] = { - STORE_TEST_CASE("column", "=", "a", true, NULL, "SELECT a FROM tbl"), - STORE_TEST_CASE("column", "=", "b", false, NULL, "SELECT a FROM tbl") + STORE_TEST_CASE("column", "=", "a", true, NULL, "SELECT a FROM tbl"), + STORE_TEST_CASE("column", "!=", "a", false, NULL, "SELECT a FROM tbl"), + STORE_TEST_CASE("column", "=", "b", false, NULL, "SELECT a FROM tbl"), + STORE_TEST_CASE("column", "!=", "b", true, NULL, "SELECT a FROM tbl"), + STORE_TEST_CASE("column", "=", "tbl.a", true, NULL, "SELECT a FROM tbl"), + STORE_TEST_CASE("column", "=", "tbl.a", true, NULL, "SELECT tbl.a FROM tbl"), + + STORE_TEST_CASE("column", "like", ".*a", true, NULL, "SELECT a from tbl"), + STORE_TEST_CASE("column", "like", ".*a", true, NULL, "SELECT tbl.a from tbl"), + STORE_TEST_CASE("column", "like", ".*a", true, NULL, "SELECT db.tbl.a from tbl"), + STORE_TEST_CASE("column", "like", ".*aa", false, NULL, "SELECT a from tbl"), + STORE_TEST_CASE("column", "like", ".*aa", false, NULL, "SELECT tbl.a from tbl"), + STORE_TEST_CASE("column", "like", ".*aa", false, NULL, "SELECT db.tbl.a from tbl"), + STORE_TEST_CASE("column", "unlike", ".*aa", true, NULL, "SELECT a from tbl"), + STORE_TEST_CASE("column", "unlike", ".*aa", true, NULL, "SELECT tbl.a from tbl"), + STORE_TEST_CASE("column", "unlike", ".*aa", true, NULL, "SELECT db.tbl.a from tbl"), + + STORE_TEST_CASE("table", "=", "tbl", true, NULL, "SELECT a FROM tbl"), + STORE_TEST_CASE("table", "!=", "tbl", false, NULL, "SELECT a FROM tbl"), + STORE_TEST_CASE("table", "=", "tbl2", false, NULL, "SELECT a FROM tbl"), + STORE_TEST_CASE("table", "!=", "tbl2", true, NULL, "SELECT a FROM tbl"), + STORE_TEST_CASE("table", "=", "db.tbl", true, NULL, "SELECT a from db.tbl"), + STORE_TEST_CASE("table", "=", "db.tbl", true, "db", "SELECT a from tbl"), + STORE_TEST_CASE("table", "!=", "db.tbl", false, NULL, "SELECT a from db.tbl"), + STORE_TEST_CASE("table", "!=", "db.tbl", false, "db", "SELECT a from tbl"), + + STORE_TEST_CASE("database", "=", "db", false, NULL, "SELECT a FROM tbl"), + STORE_TEST_CASE("database", "!=", "db", true, NULL, "SELECT a FROM tbl"), + STORE_TEST_CASE("database", "=", "db1", true, NULL, "SELECT a FROM db1.tbl"), + STORE_TEST_CASE("database", "!=", "db1", false, NULL, "SELECT a FROM db1.tbl"), + STORE_TEST_CASE("database", "=", "db1", true, "db1", "SELECT a FROM tbl"), + STORE_TEST_CASE("database", "!=", "db1", false, "db1", "SELECT a FROM tbl"), + + STORE_TEST_CASE("query", "=", "SELECT a FROM tbl", true, NULL, "SELECT a FROM tbl"), + STORE_TEST_CASE("query", "!=", "SELECT a FROM tbl", false, NULL, "SELECT a FROM tbl"), + STORE_TEST_CASE("query", "=", "SELECT b FROM tbl", false, NULL, "SELECT a FROM tbl"), + STORE_TEST_CASE("query", "!=", "SELECT b FROM tbl", true, NULL, "SELECT a FROM tbl"), + + STORE_TEST_CASE("column", "=", "a", false, NULL, "SELECT b FROM tbl WHERE a = 5"), + STORE_TEST_CASE("column", "=", "a", true, NULL, "SELECT a, b FROM tbl WHERE a = 5"), }; const size_t n_store_test_cases = sizeof(store_test_cases) / sizeof(store_test_cases[0]); @@ -144,6 +182,7 @@ int test_store() for (int i = 0; i < n_store_test_cases; ++i) { + printf("TC : %d\n", i + 1); const struct store_test_case *test_case = &store_test_cases[i]; CACHE_RULES *rules = cache_rules_parse(test_case->rule, 0); @@ -160,10 +199,12 @@ int test_store() { printf("Query : %s\n" "Rule : %s\n" + "Def-db : %s\n" "Expected: %s\n" "Result : %s\n\n", test_case->query, test_case->rule, + test_case->default_db, test_case->matches ? "A match" : "Not a match", matches ? "A match" : "Not a match"); }