Changed variable type which includes information of query type returned by query classifier.
As a consequence, if autocommit is enabled, active transaction(s) are implicitly committed and MaxScale detects that implicit commit.
This commit is contained in:
parent
fcf7d37114
commit
288ca68677
@ -94,13 +94,13 @@ skygw_query_type_t skygw_query_classifier_get_type(
|
||||
const char* query,
|
||||
unsigned long client_flags)
|
||||
{
|
||||
MYSQL* mysql;
|
||||
char* query_str;
|
||||
const char* user = "skygw";
|
||||
const char* db = "skygw";
|
||||
THD* thd;
|
||||
MYSQL* mysql;
|
||||
char* query_str;
|
||||
const char* user = "skygw";
|
||||
const char* db = "skygw";
|
||||
THD* thd;
|
||||
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
|
||||
bool failp = FALSE;
|
||||
bool failp = FALSE;
|
||||
|
||||
ss_info_dassert(query != NULL, ("query_str is NULL"));
|
||||
|
||||
@ -343,9 +343,9 @@ return_here:
|
||||
* restrictive, for example, QUERY_TYPE_READ is smaller than QUERY_TYPE_WRITE.
|
||||
*
|
||||
*/
|
||||
static skygw_query_type_t set_query_type(
|
||||
skygw_query_type_t* qtype,
|
||||
skygw_query_type_t new_type)
|
||||
static u_int8_t set_query_type(
|
||||
u_int8_t* qtype,
|
||||
u_int8_t new_type)
|
||||
{
|
||||
*qtype = MAX(*qtype, new_type);
|
||||
return *qtype;
|
||||
@ -371,8 +371,9 @@ static skygw_query_type_t resolve_query_type(
|
||||
THD* thd)
|
||||
{
|
||||
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
|
||||
LEX* lex;
|
||||
Item* item;
|
||||
u_int8_t type = QUERY_TYPE_UNKNOWN;
|
||||
LEX* lex;
|
||||
Item* item;
|
||||
/**
|
||||
* By default, if sql_log_bin, that is, recording data modifications
|
||||
* to binary log, is disabled, gateway treats operations normally.
|
||||
@ -389,8 +390,8 @@ static skygw_query_type_t resolve_query_type(
|
||||
|
||||
/** SELECT ..INTO variable|OUTFILE|DUMPFILE */
|
||||
if (lex->result != NULL) {
|
||||
qtype = QUERY_TYPE_SESSION_WRITE;
|
||||
goto return_qtype;
|
||||
type = QUERY_TYPE_SESSION_WRITE;
|
||||
goto return_qtype;
|
||||
}
|
||||
/**
|
||||
* 1:ALTER TABLE, TRUNCATE, REPAIR, OPTIMIZE, ANALYZE, CHECK.
|
||||
@ -402,17 +403,17 @@ static skygw_query_type_t resolve_query_type(
|
||||
* CREATE SPFUNCTION, INSTALL|UNINSTALL PLUGIN
|
||||
*/
|
||||
if (is_log_table_write_query(lex->sql_command) ||
|
||||
is_update_query(lex->sql_command))
|
||||
is_update_query(lex->sql_command))
|
||||
{
|
||||
if (thd->variables.sql_log_bin == 0 &&
|
||||
force_data_modify_op_replication)
|
||||
{
|
||||
qtype = QUERY_TYPE_SESSION_WRITE;
|
||||
} else {
|
||||
qtype = QUERY_TYPE_WRITE;
|
||||
}
|
||||
if (thd->variables.sql_log_bin == 0 &&
|
||||
force_data_modify_op_replication)
|
||||
{
|
||||
type |= QUERY_TYPE_SESSION_WRITE;
|
||||
} else {
|
||||
type |= QUERY_TYPE_WRITE;
|
||||
}
|
||||
|
||||
goto return_qtype;
|
||||
goto return_qtype;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -422,56 +423,56 @@ static skygw_query_type_t resolve_query_type(
|
||||
if (sql_command_flags[lex->sql_command] & CF_AUTO_COMMIT_TRANS) {
|
||||
if (lex->option_type == OPT_GLOBAL)
|
||||
{
|
||||
qtype = QUERY_TYPE_GLOBAL_WRITE;
|
||||
type |= (QUERY_TYPE_GLOBAL_WRITE|QUERY_TYPE_COMMIT);
|
||||
}
|
||||
else
|
||||
{
|
||||
qtype = QUERY_TYPE_SESSION_WRITE;
|
||||
type |= (QUERY_TYPE_SESSION_WRITE|QUERY_TYPE_COMMIT);
|
||||
}
|
||||
goto return_qtype;
|
||||
}
|
||||
|
||||
/** Try to catch session modifications here */
|
||||
switch (lex->sql_command) {
|
||||
case SQLCOM_SET_OPTION:
|
||||
if (lex->option_type == OPT_GLOBAL)
|
||||
{
|
||||
qtype = QUERY_TYPE_GLOBAL_WRITE;
|
||||
break;
|
||||
}
|
||||
case SQLCOM_SET_OPTION:
|
||||
if (lex->option_type == OPT_GLOBAL)
|
||||
{
|
||||
type |= QUERY_TYPE_GLOBAL_WRITE;
|
||||
break;
|
||||
}
|
||||
/**<! fall through */
|
||||
case SQLCOM_CHANGE_DB:
|
||||
qtype = QUERY_TYPE_SESSION_WRITE;
|
||||
break;
|
||||
case SQLCOM_CHANGE_DB:
|
||||
type |= QUERY_TYPE_SESSION_WRITE;
|
||||
break;
|
||||
|
||||
case SQLCOM_SELECT:
|
||||
qtype = QUERY_TYPE_READ;
|
||||
break;
|
||||
case SQLCOM_SELECT:
|
||||
type |= QUERY_TYPE_READ;
|
||||
break;
|
||||
|
||||
case SQLCOM_CALL:
|
||||
qtype = QUERY_TYPE_WRITE;
|
||||
break;
|
||||
case SQLCOM_CALL:
|
||||
type |= QUERY_TYPE_WRITE;
|
||||
break;
|
||||
|
||||
case SQLCOM_BEGIN:
|
||||
type |= QUERY_TYPE_BEGIN_TRX;
|
||||
goto return_qtype;
|
||||
break;
|
||||
|
||||
case SQLCOM_BEGIN:
|
||||
qtype = QUERY_TYPE_BEGIN_TRX;
|
||||
goto return_qtype;
|
||||
break;
|
||||
|
||||
case SQLCOM_COMMIT:
|
||||
qtype = QUERY_TYPE_COMMIT;
|
||||
goto return_qtype;
|
||||
break;
|
||||
|
||||
case SQLCOM_ROLLBACK:
|
||||
qtype = QUERY_TYPE_ROLLBACK;
|
||||
goto return_qtype;
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
case SQLCOM_COMMIT:
|
||||
type |= QUERY_TYPE_COMMIT;
|
||||
goto return_qtype;
|
||||
break;
|
||||
|
||||
case SQLCOM_ROLLBACK:
|
||||
type |= QUERY_TYPE_ROLLBACK;
|
||||
goto return_qtype;
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (QTYPE_LESS_RESTRICTIVE_THAN_WRITE(qtype)) {
|
||||
if (QTYPE_LESS_RESTRICTIVE_THAN_WRITE(type)) {
|
||||
/**
|
||||
* These values won't change qtype more restrictive than write.
|
||||
* UDFs and procedures could possibly cause session-wide write,
|
||||
@ -500,8 +501,7 @@ static skygw_query_type_t resolve_query_type(
|
||||
if (itype == Item::SUBSELECT_ITEM) {
|
||||
continue;
|
||||
} else if (itype == Item::FUNC_ITEM) {
|
||||
skygw_query_type_t
|
||||
func_qtype = QUERY_TYPE_UNKNOWN;
|
||||
int func_qtype = QUERY_TYPE_UNKNOWN;
|
||||
/**
|
||||
* Item types:
|
||||
* FIELD_ITEM = 0, FUNC_ITEM,
|
||||
@ -554,7 +554,7 @@ static skygw_query_type_t resolve_query_type(
|
||||
* An unknown (for maxscale) function / sp
|
||||
* belongs to this category.
|
||||
*/
|
||||
func_qtype = QUERY_TYPE_WRITE;
|
||||
func_qtype |= QUERY_TYPE_WRITE;
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [resolve_query_type] "
|
||||
@ -564,7 +564,7 @@ static skygw_query_type_t resolve_query_type(
|
||||
pthread_self())));
|
||||
break;
|
||||
case Item_func::UDF_FUNC:
|
||||
func_qtype = QUERY_TYPE_WRITE;
|
||||
func_qtype |= QUERY_TYPE_WRITE;
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [resolve_query_type] "
|
||||
@ -574,7 +574,7 @@ static skygw_query_type_t resolve_query_type(
|
||||
break;
|
||||
case Item_func::NOW_FUNC:
|
||||
case Item_func::GSYSVAR_FUNC:
|
||||
func_qtype = QUERY_TYPE_LOCAL_READ;
|
||||
func_qtype |= QUERY_TYPE_LOCAL_READ;
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [resolve_query_type] "
|
||||
@ -583,7 +583,7 @@ static skygw_query_type_t resolve_query_type(
|
||||
pthread_self())));
|
||||
break;
|
||||
case Item_func::UNKNOWN_FUNC:
|
||||
func_qtype = QUERY_TYPE_READ;
|
||||
func_qtype |= QUERY_TYPE_READ;
|
||||
/**
|
||||
* Many built-in functions are of this
|
||||
* type, for example, rand(), soundex(),
|
||||
@ -607,17 +607,18 @@ static skygw_query_type_t resolve_query_type(
|
||||
break;
|
||||
} /**< switch */
|
||||
/**< Set new query type */
|
||||
qtype = set_query_type(&qtype, func_qtype);
|
||||
type |= set_query_type(&type, func_qtype);
|
||||
}
|
||||
/**
|
||||
* Write is as restrictive as it gets due functions,
|
||||
* so break.
|
||||
*/
|
||||
if (qtype == QUERY_TYPE_WRITE) {
|
||||
if ((type & QUERY_TYPE_WRITE) == QUERY_TYPE_WRITE) {
|
||||
break;
|
||||
}
|
||||
} /**< for */
|
||||
} /**< if */
|
||||
return_qtype:
|
||||
qtype = (skygw_query_type_t)type;
|
||||
return qtype;
|
||||
}
|
||||
|
@ -29,18 +29,18 @@ EXTERN_C_BLOCK_BEGIN
|
||||
* is modified
|
||||
*/
|
||||
typedef enum {
|
||||
QUERY_TYPE_UNKNOWN = 7, /*< Couln't find out or parse error */
|
||||
QUERY_TYPE_LOCAL_READ, /*< Read non-database data, execute in MaxScale */
|
||||
QUERY_TYPE_READ, /*< No updates */
|
||||
QUERY_TYPE_WRITE, /*< Master data will be modified */
|
||||
QUERY_TYPE_SESSION_WRITE,/*< Session data will be modified */
|
||||
QUERY_TYPE_GLOBAL_WRITE, /*< Global system variable modification */
|
||||
QUERY_TYPE_BEGIN_TRX, /*< BEGIN or START TRANSACTION */
|
||||
QUERY_TYPE_ROLLBACK, /*< ROLLBACK */
|
||||
QUERY_TYPE_COMMIT /*< COMMIT */
|
||||
QUERY_TYPE_UNKNOWN = 0, /*< Couln't find out or parse error */
|
||||
QUERY_TYPE_LOCAL_READ = (1<<0), /*< Read non-database data, execute in MaxScale */
|
||||
QUERY_TYPE_READ = (1<<1), /*< No updates */
|
||||
QUERY_TYPE_WRITE = (1<<2), /*< Master data will be modified */
|
||||
QUERY_TYPE_SESSION_WRITE = (1<<3), /*< Session data will be modified */
|
||||
QUERY_TYPE_GLOBAL_WRITE = (1<<4), /*< Global system variable modification */
|
||||
QUERY_TYPE_BEGIN_TRX = (1<<5), /*< BEGIN or START TRANSACTION */
|
||||
QUERY_TYPE_ROLLBACK = (1<<6), /*< ROLLBACK */
|
||||
QUERY_TYPE_COMMIT = (1<<7), /*< COMMIT */
|
||||
} skygw_query_type_t;
|
||||
|
||||
|
||||
#define QUERY_IS_TYPE(mask,type) ((mask & type) == type)
|
||||
|
||||
skygw_query_type_t skygw_query_classifier_get_type(
|
||||
const char* query_str,
|
||||
|
@ -724,6 +724,13 @@ static int routeQuery(
|
||||
break;
|
||||
|
||||
case QUERY_TYPE_SESSION_WRITE:
|
||||
case (QUERY_TYPE_SESSION_WRITE|QUERY_TYPE_COMMIT):
|
||||
if (QUERY_IS_TYPE(qtype,QUERY_TYPE_COMMIT) &&
|
||||
transaction_active)
|
||||
)
|
||||
{
|
||||
transaction_active = false;
|
||||
}
|
||||
/**
|
||||
* Execute in backends used by current router session.
|
||||
* Save session variable commands to router session property
|
||||
@ -759,7 +766,7 @@ static int routeQuery(
|
||||
int rc2;
|
||||
|
||||
rc = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf));
|
||||
rc2 = slave_dcb->func.write(slave_dcb, gwbuf_clone(querybuf));
|
||||
rc2 = slave_dcb->func.write(slave_dcb, querybuf);
|
||||
|
||||
if (rc == 1 && rc == rc2)
|
||||
{
|
||||
@ -869,6 +876,10 @@ return_ret:
|
||||
{
|
||||
gwbuf_free(plainsqlbuf);
|
||||
}
|
||||
if (querystr != NULL)
|
||||
{
|
||||
free(querystr);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user