Improve insert value checks in insertstream

The filter now checks that the fields are implicitly listed instead of
explicitly defined.
This commit is contained in:
Markus Mäkelä
2017-01-13 10:05:30 +02:00
parent 95ffa1c689
commit 4869eccdc4

View File

@ -142,7 +142,7 @@ void free_instance(DS_INSTANCE *instance)
* This the SQL command that starts the streaming
*/
static const char load_data_template[] = "LOAD DATA LOCAL INFILE 'maxscale.data' "
"INTO TABLE %s FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'";
"INTO TABLE %s FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'";
/**
* Create an instance of the filter for a particular service
@ -272,6 +272,7 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
bool send_ok = false;
bool send_error = false;
int rc = 0;
ss_dassert(GWBUF_IS_CONTIGUOUS(queue));
if (session_trx_is_active(my_session->client_dcb->session) &&
extract_insert_target(queue, target, sizeof(target)))
@ -477,6 +478,39 @@ static uint64_t getCapabilities(void)
return RCAP_TYPE_TRANSACTION_TRACKING;
}
static bool only_implicit_values(GWBUF *buffer)
{
bool rval = false;
char *data = (char*)GWBUF_DATA(buffer);
char *ptr = strnchr_esc_mysql(data + 5, '(', GWBUF_LENGTH(buffer));
if (ptr && (ptr = strnchr_esc_mysql(ptr, ')', GWBUF_LENGTH(buffer) - (ptr - data))))
{
/** Skip the closing parenthesis and any whitespace */
ptr++;
while (isspace(*ptr) && ptr < (char*)buffer->end)
{
ptr++;
}
if (!isalnum(*ptr) && ptr < (char*)buffer->end)
{
/**
* The first pair of parentheses was followed by a non-alphanumeric
* character. We can be fairly certain that the INSERT statement
* implicitly defines the order of the column values as is done in
* the following example:
*
* INSERT INTO test.t1 VALUES (1, "hello"), (2, "world");
*/
rval = true;
}
}
return rval;
}
/**
* Check if a buffer contains an insert statement
*
@ -486,23 +520,28 @@ static uint64_t getCapabilities(void)
static bool extract_insert_target(GWBUF *buffer, char* target, int len)
{
bool rval = false;
int n_tables = 0;
char **tables = qc_get_table_names(buffer, &n_tables, true);
if (n_tables == 1)
if (qc_get_operation(buffer) == QUERY_OP_INSERT &&
only_implicit_values(buffer))
{
/** Only one table in an insert */
snprintf(target, len, "%s", tables[0]);
rval = true;
}
int n_tables = 0;
char **tables = qc_get_table_names(buffer, &n_tables, true);
if (tables)
{
for (int i = 0; i < (size_t)n_tables; ++i)
if (n_tables == 1)
{
MXS_FREE(tables[i]);
/** Only one table in an insert */
snprintf(target, len, "%s", tables[0]);
rval = true;
}
if (tables)
{
for (int i = 0; i < (size_t)n_tables; ++i)
{
MXS_FREE(tables[i]);
}
MXS_FREE(tables);
}
MXS_FREE(tables);
}
return rval;