MXS-1196: Handle sequence related fields/functions

Both 10.3 and Oracle support sequence pseudo colums and corresponding
functions. Getting the next number in the sequence is in both cases
obtained using nextval/nextval() but the current number is in Oracle
obtained using currval/currval() and in 10.3 using lastval/lastval().

These fields/functions are now ignored, in the sense that they will
not show up in the field/function infos. However, they will cause the
type mask of the statement to contain the bit QUERY_TYPE_WRITE so that
statements accessing the sequence will always be sent to the master.
This commit is contained in:
Johan Wikman
2017-05-23 10:04:40 +03:00
parent 61b265467b
commit 9ecd5da255
2 changed files with 130 additions and 14 deletions

View File

@ -180,6 +180,10 @@ static QC_SQLITE_INFO* info_alloc(uint32_t collect);
static void info_finish(QC_SQLITE_INFO* info);
static void info_free(QC_SQLITE_INFO* info);
static QC_SQLITE_INFO* info_init(QC_SQLITE_INFO* info, uint32_t collect);
static bool is_sequence_related_field(const char* database,
const char* table,
const char* column);
static bool is_sequence_related_function(const char* func_name);
static void log_invalid_data(GWBUF* query, const char* message);
static const char* map_function_name(const char* name);
static bool parse_query(GWBUF* query, uint32_t collect);
@ -658,6 +662,57 @@ static bool query_is_parsed(GWBUF* query, uint32_t collect)
return rc;
}
/**
* Returns whether a field is sequence related.
*
* @param database The database/schema or NULL.
* @param table The table or NULL.
* @param column The column.
*
* @return True, if the field is sequence related, false otherwise.
*/
static bool is_sequence_related_field(const char* database,
const char* table,
const char* column)
{
return is_sequence_related_function(column);
}
/**
* Returns whether a function is sequence related.
*
* @param func_name A function.
*
* @return True, if the function is sequence related, false otherwise.
*/
static bool is_sequence_related_function(const char* func_name)
{
bool rv = false;
if (this_unit.sql_mode == QC_SQL_MODE_ORACLE)
{
// In Oracle mode we ignore the pseudocolumns "currval" and "nextval".
// We also exclude "lastval", the 10.3 equivalent of "currval".
if ((strcasecmp(func_name, "currval") == 0) ||
(strcasecmp(func_name, "nextval") == 0) ||
(strcasecmp(func_name, "lastval") == 0))
{
rv = true;
}
}
if (!rv && (this_unit.parse_as == QC_PARSE_AS_103))
{
if ((strcasecmp(func_name, "lastval") == 0) ||
(strcasecmp(func_name, "nextval") == 0))
{
rv = true;
}
}
return rv;
}
/**
* Logs information about invalid data.
*
@ -759,6 +814,12 @@ static void update_field_info(QC_SQLITE_INFO* info,
{
ss_dassert(column);
if (is_sequence_related_field(database, table, column))
{
info->type_mask |= QUERY_TYPE_WRITE;
return;
}
if (!(info->collect & QC_COLLECT_FIELDS) || (info->collected & QC_COLLECT_FIELDS))
{
// If field information should not be collected, or if field information
@ -1065,6 +1126,8 @@ static void update_field_infos(QC_SQLITE_INFO* info,
{
const char* zToken = pExpr->u.zToken;
bool ignore_exprlist = false;
switch (pExpr->op)
{
case TK_ASTERISK: // select *
@ -1187,6 +1250,11 @@ static void update_field_infos(QC_SQLITE_INFO* info,
{
info->type_mask |= (QUERY_TYPE_READ | QUERY_TYPE_MASTER_READ);
}
else if (is_sequence_related_function(zToken))
{
info->type_mask |= QUERY_TYPE_WRITE;
ignore_exprlist = true;
}
else if (!is_builtin_readonly_function(zToken, this_unit.sql_mode == QC_SQL_MODE_ORACLE))
{
info->type_mask |= QUERY_TYPE_WRITE;
@ -1194,7 +1262,7 @@ static void update_field_infos(QC_SQLITE_INFO* info,
// We exclude "row", because we cannot detect all rows the same
// way qc_mysqlembedded does.
if (strcasecmp(zToken, "row") != 0)
if (!ignore_exprlist && (strcasecmp(zToken, "row") != 0))
{
update_function_info(info, zToken, usage);
}
@ -1227,7 +1295,10 @@ static void update_field_infos(QC_SQLITE_INFO* info,
case TK_BETWEEN:
case TK_CASE:
case TK_FUNCTION:
update_field_infos_from_exprlist(info, pExpr->x.pList, usage, pExclude);
if (!ignore_exprlist)
{
update_field_infos_from_exprlist(info, pExpr->x.pList, usage, pExclude);
}
break;
case TK_EXISTS: