Bug #418, added functions to query classifier to detect if SET autocommit is called.

Note: this compiles but doesn't work yet properly.
This commit is contained in:
VilhoRaatikka 2014-04-09 23:43:03 +03:00
parent 5e3ec5b3c8
commit 2c17dc3edf
4 changed files with 294 additions and 209 deletions

View File

@ -50,6 +50,7 @@
#include <sql_parse.h>
#include <errmsg.h>
#include <client_settings.h>
#include <set_var.h>
#include <item_func.h>
@ -73,9 +74,17 @@ static bool create_parse_tree(
static skygw_query_type_t resolve_query_type(
THD* thd);
static bool skygw_stmt_causes_implicit_commit(
LEX* lex,
uint mask);
LEX* lex,
bool* autocommit);
static bool skygw_stmt_disables_autocommit(
LEX* lex);
static bool is_autocommit_stmt(
LEX* lex,
bool enable_cmd); /*< true=enable, false=disable */
/**
* @node (write brief function description here)
@ -374,7 +383,8 @@ static skygw_query_type_t resolve_query_type(
THD* thd)
{
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
u_int8_t type = QUERY_TYPE_UNKNOWN;
u_int8_t type = QUERY_TYPE_UNKNOWN;
bool is_set_autocommit = false;;
LEX* lex;
Item* item;
/**
@ -397,7 +407,9 @@ static skygw_query_type_t resolve_query_type(
goto return_qtype;
}
if (skygw_stmt_causes_implicit_commit(lex, CF_AUTO_COMMIT_TRANS))
if (skygw_stmt_causes_implicit_commit(
lex,
&is_set_autocommit))
{
if (LOG_IS_ENABLED(LOGFILE_TRACE))
{
@ -418,7 +430,25 @@ static skygw_query_type_t resolve_query_type(
"next command.");
}
}
if (is_set_autocommit)
{
type |= QUERY_TYPE_ENABLE_AUTOCOMMIT;
}
type |= QUERY_TYPE_COMMIT;
}
if (skygw_stmt_disables_autocommit(lex))
{
if (LOG_IS_ENABLED(LOGFILE_TRACE))
{
skygw_log_write(
LOGFILE_TRACE,
"Disable autocommit : implicit START TRANSACTION"
" before executing the next command.");
}
type |= QUERY_TYPE_DISABLE_AUTOCOMMIT;
type |= QUERY_TYPE_BEGIN_TRX;
}
/**
* REVOKE ALL, ASSIGN_TO_KEYCACHE,
@ -648,11 +678,17 @@ return_qtype:
return qtype;
}
static bool skygw_stmt_causes_implicit_commit(LEX* lex, uint mask)
/**
* Checks if statement causes implicit COMMIT.
* If SET autocommit=1 is called, sets autocommit pointer.
*/
static bool skygw_stmt_causes_implicit_commit(
LEX* lex,
bool* autocommit)
{
bool succp;
if (!(sql_command_flags[lex->sql_command] & mask))
if (!(sql_command_flags[lex->sql_command] & CF_AUTO_COMMIT_TRANS))
{
succp = false;
goto return_succp;
@ -668,13 +704,63 @@ static bool skygw_stmt_causes_implicit_commit(LEX* lex, uint mask)
succp = !(lex->create_info.options & HA_LEX_CREATE_TMP_TABLE);
break;
case SQLCOM_SET_OPTION:
succp = lex->autocommit ? true : false;
succp = is_autocommit_stmt(lex, true);
*autocommit = succp;
break;
default:
succp = true;
break;
}
return_succp:
return succp;
}
/**
* Finds out if SET autocommit=0 was called.
*/
static bool skygw_stmt_disables_autocommit(
LEX* lex)
{
bool succp = false;
if (lex->sql_command == SQLCOM_SET_OPTION)
{
succp = is_autocommit_stmt(lex, false); /*< SET autocommit=0 ? */
goto return_succp;
}
return_succp:
return succp;
}
/**
* Finds out if stmt is SET autocommit
* and if the new value matches with the enable_cmd argument.
*/
static bool is_autocommit_stmt(
LEX* lex,
bool enable_cmd) /*< true=enable, false=disable */
{
struct list_node* node;
set_var* setvar;
bool succp = false;
char c = (enable_cmd ? '1' : '0');
node = lex->var_list.first_node();
while((setvar=(set_var*)node->info) != NULL)
{
if (strcmp(
"autocommit",
((sys_var *)setvar->var)->name.str)
== 0 &&
*((Item *)setvar->value)->name == c)
{
succp = true;
goto return_succp;
}
node = node->next;
}
return_succp:
return succp;
}

View File

@ -29,15 +29,17 @@ EXTERN_C_BLOCK_BEGIN
* is modified
*/
typedef enum {
QUERY_TYPE_UNKNOWN = 0, /*< Couln't find out or parse error */
QUERY_TYPE_LOCAL_READ = (1<<0), /*< Read non-database data, execute in MaxScale */
QUERY_TYPE_READ = (1<<1), /*< No updates */
QUERY_TYPE_WRITE = (1<<2), /*< Master data will be modified */
QUERY_TYPE_SESSION_WRITE = (1<<3), /*< Session data will be modified */
QUERY_TYPE_GLOBAL_WRITE = (1<<4), /*< Global system variable modification */
QUERY_TYPE_BEGIN_TRX = (1<<5), /*< BEGIN or START TRANSACTION */
QUERY_TYPE_ROLLBACK = (1<<6), /*< ROLLBACK */
QUERY_TYPE_COMMIT = (1<<7), /*< COMMIT */
QUERY_TYPE_UNKNOWN = 0x00, /*< Couln't find out or parse error */
QUERY_TYPE_LOCAL_READ = 0x01, /*< Read non-database data, execute in MaxScale */
QUERY_TYPE_READ = 0x02, /*< No updates */
QUERY_TYPE_WRITE = 0x04, /*< Master data will be modified */
QUERY_TYPE_SESSION_WRITE = 0x08, /*< Session data will be modified */
QUERY_TYPE_GLOBAL_WRITE = 0x10, /*< Global system variable modification */
QUERY_TYPE_BEGIN_TRX = 0x20, /*< BEGIN or START TRANSACTION */
QUERY_TYPE_ENABLE_AUTOCOMMIT = 0x30,/*< SET autocommit=1 */
QUERY_TYPE_DISABLE_AUTOCOMMIT = 0x40,/*< SET autocommit=0 */
QUERY_TYPE_ROLLBACK = 0x50, /*< ROLLBACK */
QUERY_TYPE_COMMIT = 0x60 /*< COMMIT */
} skygw_query_type_t;
#define QUERY_IS_TYPE(mask,type) ((mask & type) == type)

View File

@ -86,7 +86,7 @@ static char* server_options[] = {
"--no-defaults",
"--datadir=",
"--language=",
"--skip-innodb",
// "--skip-innodb",
"--default-storage-engine=myisam",
NULL
};
@ -100,7 +100,7 @@ static char* server_groups[] = {
"server",
"server",
"embedded",
"server",
// "server",
"server",
NULL
};

View File

@ -157,6 +157,13 @@ static void tracelog_routed_query(
DCB* dcb,
GWBUF* buf);
static bool route_session_write(
ROUTER_CLIENT_SES* router_client_ses,
GWBUF* querybuf,
ROUTER_INSTANCE* inst,
unsigned char packet_type,
skygw_query_type_t qtype);
static SPINLOCK instlock;
static ROUTER_INSTANCE* instances;
@ -574,10 +581,12 @@ static int routeQuery(
bool rses_is_closed;
rses_property_t* prop;
size_t len;
static bool transaction_active;
/** if false everything goes to master and session commands to slave too */
static bool autocommit_enabled = true;
/** if true everything goes to master and session commands to slave too */
static bool transaction_active = false;
CHK_CLIENT_RSES(router_cli_ses);
/** Dirty read for quick check if router is closed. */
if (router_cli_ses->rses_closed)
@ -669,207 +678,97 @@ static int routeQuery(
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"Packet type\t%s",
STRPACKETTYPE(packet_type))));
if (QUERY_IS_TYPE(qtype,QUERY_TYPE_COMMIT) &&
transaction_active)
/**
* If autocommit is disabled or transaction is explicitly started
* transaction becomes active and master gets all statements until
* transaction is committed and autocommit is enabled again.
*/
if (autocommit_enabled &&
QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT))
{
autocommit_enabled = false;
if (!transaction_active)
{
transaction_active = true;
}
}
else if (!transaction_active &&
QUERY_IS_TYPE(qtype, QUERY_TYPE_BEGIN_TRX))
{
transaction_active = true;
}
/**
* Explicit COMMIT and ROLLBACK, implicit COMMIT.
*/
if (autocommit_enabled &&
transaction_active &&
QUERY_IS_TYPE(qtype,(QUERY_TYPE_COMMIT|QUERY_TYPE_ROLLBACK)))
{
transaction_active = false;
}
else if (!autocommit_enabled &&
QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT))
{
autocommit_enabled = true;
transaction_active = false;
}
switch (qtype) {
case QUERY_TYPE_WRITE:
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Query type\t%s, "
"routing to Master.",
pthread_self(),
STRQTYPE(qtype))));
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
master_dcb,
gwbuf_clone(querybuf)));
ret = master_dcb->func.write(master_dcb, querybuf);
atomic_add(&inst->stats.n_master, 1);
goto return_ret;
break;
case QUERY_TYPE_READ:
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Query type\t%s, "
"routing to %s.",
pthread_self(),
STRQTYPE(qtype),
(transaction_active ? "Master" : "Slave"))));
LOGIF(LT, tracelog_routed_query(
router_cli_ses,
"routeQuery",
(transaction_active ? master_dcb : slave_dcb),
gwbuf_clone(querybuf)));
if (transaction_active)
{
ret = master_dcb->func.write(master_dcb, querybuf);
}
else
{
ret = slave_dcb->func.write(slave_dcb, querybuf);
}
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Routed.",
pthread_self())));
atomic_add(&inst->stats.n_slave, 1);
goto return_ret;
break;
case QUERY_TYPE_SESSION_WRITE:
/**
* Execute in backends used by current router session.
* Save session variable commands to router session property
* struct. Thus, they can be replayed in backends which are
* started and joined later.
*
* Suppress redundant OK packets sent by backends.
*
* DOES THIS ALL APPLY TO COM_QUIT AS WELL??
*
* The first OK packet is replied to the client.
*
*/
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] DCB M:%p s:%p, "
"Query type\t%s, "
"packet type %s, routing to all servers.",
pthread_self(),
master_dcb,
slave_dcb,
STRQTYPE(qtype),
STRPACKETTYPE(packet_type))));
/**
* COM_QUIT is one-way message. Server doesn't respond to that.
* Therefore reply processing is unnecessary and session
* command property is not needed. It is just routed to both
* backends.
*/
if (packet_type == COM_QUIT)
{
int rc;
int rc2;
rc = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf));
rc2 = slave_dcb->func.write(slave_dcb, querybuf);
if (rc == 1 && rc == rc2)
{
ret = 1;
}
goto return_ret;
}
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
/**
* Additional reference is created to querybuf to
* prevent it from being released before properties
* are cleaned up as a part of router sessionclean-up.
*/
mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses);
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
rses_property_done(prop);
goto return_ret;
}
/** Add sescmd property to router client session */
rses_property_add(router_cli_ses, prop);
/** Execute session command in master */
if (execute_sescmd_in_backend(router_cli_ses, BE_MASTER))
{
ret = 1;
}
else
{
/** Log error */
}
/** Execute session command in slave */
if (execute_sescmd_in_backend(router_cli_ses, BE_SLAVE))
/**
* Session update is always routed in the same way.
*/
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE))
{
if (route_session_write(
router_cli_ses,
querybuf,
inst,
packet_type,
qtype))
{
ret = 1;
}
else
else
{
/** Log error */
ret = 0;
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
atomic_add(&inst->stats.n_all, 1);
goto return_ret;
break;
case QUERY_TYPE_BEGIN_TRX:
transaction_active = true;
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
master_dcb,
gwbuf_clone(querybuf)));
ret = master_dcb->func.write(master_dcb, querybuf);
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Routed.",
pthread_self())));
atomic_add(&inst->stats.n_master, 1);
goto return_ret;
break;
case QUERY_TYPE_COMMIT:
case QUERY_TYPE_ROLLBACK:
transaction_active = false;
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
master_dcb,
gwbuf_clone(querybuf)));
ret = master_dcb->func.write(master_dcb, querybuf);
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Routed.",
pthread_self())));
atomic_add(&inst->stats.n_master, 1);
goto return_ret;
break;
default:
}
else if (transaction_active) /*< all to master */
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Query type\t%s, "
"routing to Master by default.",
pthread_self(),
STRQTYPE(qtype))));
/**
* Is this really ok?
* What is not known is routed to master.
*/
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
master_dcb,
gwbuf_clone(querybuf)));
LOGFILE_TRACE,
"Transaction is active, routing to Master.")));
ret = master_dcb->func.write(master_dcb, querybuf);
atomic_add(&inst->stats.n_master, 1);
goto return_ret;
break;
} /*< switch by query type */
}
else if (QUERY_IS_TYPE(
qtype,
(QUERY_TYPE_BEGIN_TRX|QUERY_TYPE_WRITE|QUERY_TYPE_UNKNOWN)))
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Begin transaction, write or unspecified type, "
"routing to Master.")));
ret = master_dcb->func.write(master_dcb, querybuf);
atomic_add(&inst->stats.n_master, 1);
goto return_ret;
}
else
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Read-only query, routing to Slave.")));
ss_dassert(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ));
ret = slave_dcb->func.write(slave_dcb, querybuf);
atomic_add(&inst->stats.n_slave, 1);
goto return_ret;
}
return_ret:
if (plainsqlbuf != NULL)
@ -1851,4 +1750,102 @@ static uint8_t getCapabilities (
return_rc:
return rc;
}
/**
* Execute in backends used by current router session.
* Save session variable commands to router session property
* struct. Thus, they can be replayed in backends which are
* started and joined later.
*
* Suppress redundant OK packets sent by backends.
*
* The first OK packet is replied to the client.
* Return true if succeed, false is returned if router session was closed or
* if execute_sescmd_in_backend failed.
*/
static bool route_session_write(
ROUTER_CLIENT_SES* router_cli_ses,
GWBUF* querybuf,
ROUTER_INSTANCE* inst,
unsigned char packet_type,
skygw_query_type_t qtype)
{
bool succp;
DCB* master_dcb;
DCB* slave_dcb;
rses_property_t* prop;
master_dcb = router_cli_ses->rses_dcb[BE_MASTER];
slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE];
CHK_DCB(master_dcb);
CHK_DCB(slave_dcb);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Session write, query type\t%s, packet type %s, "
"routing to all servers.",
STRQTYPE(qtype),
STRPACKETTYPE(packet_type))));
/**
* COM_QUIT is one-way message. Server doesn't respond to that.
* Therefore reply processing is unnecessary and session
* command property is not needed. It is just routed to both
* backends.
*/
if (packet_type == COM_QUIT)
{
int rc;
int rc2;
rc = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf));
rc2 = slave_dcb->func.write(slave_dcb, querybuf);
if (rc == 1 && rc == rc2)
{
succp = true;
}
goto return_succp;
}
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
/**
* Additional reference is created to querybuf to
* prevent it from being released before properties
* are cleaned up as a part of router sessionclean-up.
*/
mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses);
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
rses_property_done(prop);
succp = false;
goto return_succp;
}
/** Add sescmd property to router client session */
rses_property_add(router_cli_ses, prop);
/** Execute session command in master */
succp = execute_sescmd_in_backend(router_cli_ses, BE_MASTER);
if (!succp)
{
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
goto return_succp;
}
/** Execute session command in slave */
succp = execute_sescmd_in_backend(router_cli_ses, BE_SLAVE);
if (!succp)
{
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
goto return_succp;
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
atomic_add(&inst->stats.n_all, 1);
return_succp:
return succp;
}