diff --git a/query_classifier/query_classifier.cc b/query_classifier/query_classifier.cc index 1976bd6b2..dbc6747b3 100644 --- a/query_classifier/query_classifier.cc +++ b/query_classifier/query_classifier.cc @@ -50,6 +50,7 @@ #include #include #include +#include #include @@ -73,9 +74,17 @@ static bool create_parse_tree( static skygw_query_type_t resolve_query_type( THD* thd); + static bool skygw_stmt_causes_implicit_commit( - LEX* lex, - uint mask); + LEX* lex, + bool* autocommit); + +static bool skygw_stmt_disables_autocommit( + LEX* lex); + +static bool is_autocommit_stmt( + LEX* lex, + bool enable_cmd); /*< true=enable, false=disable */ /** * @node (write brief function description here) @@ -374,7 +383,8 @@ static skygw_query_type_t resolve_query_type( THD* thd) { skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; - u_int8_t type = QUERY_TYPE_UNKNOWN; + u_int8_t type = QUERY_TYPE_UNKNOWN; + bool is_set_autocommit = false;; LEX* lex; Item* item; /** @@ -397,7 +407,9 @@ static skygw_query_type_t resolve_query_type( goto return_qtype; } - if (skygw_stmt_causes_implicit_commit(lex, CF_AUTO_COMMIT_TRANS)) + if (skygw_stmt_causes_implicit_commit( + lex, + &is_set_autocommit)) { if (LOG_IS_ENABLED(LOGFILE_TRACE)) { @@ -418,7 +430,25 @@ static skygw_query_type_t resolve_query_type( "next command."); } } + + if (is_set_autocommit) + { + type |= QUERY_TYPE_ENABLE_AUTOCOMMIT; + } type |= QUERY_TYPE_COMMIT; + } + + if (skygw_stmt_disables_autocommit(lex)) + { + if (LOG_IS_ENABLED(LOGFILE_TRACE)) + { + skygw_log_write( + LOGFILE_TRACE, + "Disable autocommit : implicit START TRANSACTION" + " before executing the next command."); + } + type |= QUERY_TYPE_DISABLE_AUTOCOMMIT; + type |= QUERY_TYPE_BEGIN_TRX; } /** * REVOKE ALL, ASSIGN_TO_KEYCACHE, @@ -648,11 +678,17 @@ return_qtype: return qtype; } -static bool skygw_stmt_causes_implicit_commit(LEX* lex, uint mask) +/** + * Checks if statement causes implicit COMMIT. + * If SET autocommit=1 is called, sets autocommit pointer. + */ +static bool skygw_stmt_causes_implicit_commit( + LEX* lex, + bool* autocommit) { bool succp; - if (!(sql_command_flags[lex->sql_command] & mask)) + if (!(sql_command_flags[lex->sql_command] & CF_AUTO_COMMIT_TRANS)) { succp = false; goto return_succp; @@ -668,13 +704,63 @@ static bool skygw_stmt_causes_implicit_commit(LEX* lex, uint mask) succp = !(lex->create_info.options & HA_LEX_CREATE_TMP_TABLE); break; case SQLCOM_SET_OPTION: - succp = lex->autocommit ? true : false; + succp = is_autocommit_stmt(lex, true); + *autocommit = succp; break; default: succp = true; break; } - return_succp: return succp; } + +/** + * Finds out if SET autocommit=0 was called. + */ +static bool skygw_stmt_disables_autocommit( + LEX* lex) +{ + bool succp = false; + + if (lex->sql_command == SQLCOM_SET_OPTION) + { + succp = is_autocommit_stmt(lex, false); /*< SET autocommit=0 ? */ + goto return_succp; + } +return_succp: + return succp; +} + + +/** + * Finds out if stmt is SET autocommit + * and if the new value matches with the enable_cmd argument. + */ +static bool is_autocommit_stmt( + LEX* lex, + bool enable_cmd) /*< true=enable, false=disable */ +{ + struct list_node* node; + set_var* setvar; + bool succp = false; + char c = (enable_cmd ? '1' : '0'); + + node = lex->var_list.first_node(); + + while((setvar=(set_var*)node->info) != NULL) + { + if (strcmp( + "autocommit", + ((sys_var *)setvar->var)->name.str) + == 0 && + *((Item *)setvar->value)->name == c) + { + succp = true; + goto return_succp; + } + node = node->next; + } +return_succp: + return succp; +} \ No newline at end of file diff --git a/query_classifier/query_classifier.h b/query_classifier/query_classifier.h index 69a5f92da..15bb38e5f 100644 --- a/query_classifier/query_classifier.h +++ b/query_classifier/query_classifier.h @@ -29,15 +29,17 @@ EXTERN_C_BLOCK_BEGIN * is modified */ typedef enum { - QUERY_TYPE_UNKNOWN = 0, /*< Couln't find out or parse error */ - QUERY_TYPE_LOCAL_READ = (1<<0), /*< Read non-database data, execute in MaxScale */ - QUERY_TYPE_READ = (1<<1), /*< No updates */ - QUERY_TYPE_WRITE = (1<<2), /*< Master data will be modified */ - QUERY_TYPE_SESSION_WRITE = (1<<3), /*< Session data will be modified */ - QUERY_TYPE_GLOBAL_WRITE = (1<<4), /*< Global system variable modification */ - QUERY_TYPE_BEGIN_TRX = (1<<5), /*< BEGIN or START TRANSACTION */ - QUERY_TYPE_ROLLBACK = (1<<6), /*< ROLLBACK */ - QUERY_TYPE_COMMIT = (1<<7), /*< COMMIT */ + QUERY_TYPE_UNKNOWN = 0x00, /*< Couln't find out or parse error */ + QUERY_TYPE_LOCAL_READ = 0x01, /*< Read non-database data, execute in MaxScale */ + QUERY_TYPE_READ = 0x02, /*< No updates */ + QUERY_TYPE_WRITE = 0x04, /*< Master data will be modified */ + QUERY_TYPE_SESSION_WRITE = 0x08, /*< Session data will be modified */ + QUERY_TYPE_GLOBAL_WRITE = 0x10, /*< Global system variable modification */ + QUERY_TYPE_BEGIN_TRX = 0x20, /*< BEGIN or START TRANSACTION */ + QUERY_TYPE_ENABLE_AUTOCOMMIT = 0x30,/*< SET autocommit=1 */ + QUERY_TYPE_DISABLE_AUTOCOMMIT = 0x40,/*< SET autocommit=0 */ + QUERY_TYPE_ROLLBACK = 0x50, /*< ROLLBACK */ + QUERY_TYPE_COMMIT = 0x60 /*< COMMIT */ } skygw_query_type_t; #define QUERY_IS_TYPE(mask,type) ((mask & type) == type) diff --git a/server/core/gateway.c b/server/core/gateway.c index bd773c7f9..7b19b3de1 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -86,7 +86,7 @@ static char* server_options[] = { "--no-defaults", "--datadir=", "--language=", - "--skip-innodb", +// "--skip-innodb", "--default-storage-engine=myisam", NULL }; @@ -100,7 +100,7 @@ static char* server_groups[] = { "server", "server", "embedded", - "server", +// "server", "server", NULL }; diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 5383c1eb6..b2ff80623 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -157,6 +157,13 @@ static void tracelog_routed_query( DCB* dcb, GWBUF* buf); +static bool route_session_write( + ROUTER_CLIENT_SES* router_client_ses, + GWBUF* querybuf, + ROUTER_INSTANCE* inst, + unsigned char packet_type, + skygw_query_type_t qtype); + static SPINLOCK instlock; static ROUTER_INSTANCE* instances; @@ -574,10 +581,12 @@ static int routeQuery( bool rses_is_closed; rses_property_t* prop; size_t len; - static bool transaction_active; + /** if false everything goes to master and session commands to slave too */ + static bool autocommit_enabled = true; + /** if true everything goes to master and session commands to slave too */ + static bool transaction_active = false; CHK_CLIENT_RSES(router_cli_ses); - /** Dirty read for quick check if router is closed. */ if (router_cli_ses->rses_closed) @@ -669,207 +678,97 @@ static int routeQuery( LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Packet type\t%s", STRPACKETTYPE(packet_type)))); - - if (QUERY_IS_TYPE(qtype,QUERY_TYPE_COMMIT) && - transaction_active) + /** + * If autocommit is disabled or transaction is explicitly started + * transaction becomes active and master gets all statements until + * transaction is committed and autocommit is enabled again. + */ + if (autocommit_enabled && + QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT)) + { + autocommit_enabled = false; + + if (!transaction_active) + { + transaction_active = true; + } + } + else if (!transaction_active && + QUERY_IS_TYPE(qtype, QUERY_TYPE_BEGIN_TRX)) + { + transaction_active = true; + } + /** + * Explicit COMMIT and ROLLBACK, implicit COMMIT. + */ + if (autocommit_enabled && + transaction_active && + QUERY_IS_TYPE(qtype,(QUERY_TYPE_COMMIT|QUERY_TYPE_ROLLBACK))) { transaction_active = false; + } + else if (!autocommit_enabled && + QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT)) + { + autocommit_enabled = true; + transaction_active = false; } - - - switch (qtype) { - case QUERY_TYPE_WRITE: - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [routeQuery:rwsplit] Query type\t%s, " - "routing to Master.", - pthread_self(), - STRQTYPE(qtype)))); - - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "routeQuery", - master_dcb, - gwbuf_clone(querybuf))); - - ret = master_dcb->func.write(master_dcb, querybuf); - atomic_add(&inst->stats.n_master, 1); - - goto return_ret; - break; - - case QUERY_TYPE_READ: - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [routeQuery:rwsplit] Query type\t%s, " - "routing to %s.", - pthread_self(), - STRQTYPE(qtype), - (transaction_active ? "Master" : "Slave")))); - - LOGIF(LT, tracelog_routed_query( - router_cli_ses, - "routeQuery", - (transaction_active ? master_dcb : slave_dcb), - gwbuf_clone(querybuf))); - - if (transaction_active) - { - ret = master_dcb->func.write(master_dcb, querybuf); - } - else - { - ret = slave_dcb->func.write(slave_dcb, querybuf); - } - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [routeQuery:rwsplit] Routed.", - pthread_self()))); - - - atomic_add(&inst->stats.n_slave, 1); - goto return_ret; - break; - - case QUERY_TYPE_SESSION_WRITE: - /** - * Execute in backends used by current router session. - * Save session variable commands to router session property - * struct. Thus, they can be replayed in backends which are - * started and joined later. - * - * Suppress redundant OK packets sent by backends. - * - * DOES THIS ALL APPLY TO COM_QUIT AS WELL?? - * - * The first OK packet is replied to the client. - * - */ - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [routeQuery:rwsplit] DCB M:%p s:%p, " - "Query type\t%s, " - "packet type %s, routing to all servers.", - pthread_self(), - master_dcb, - slave_dcb, - STRQTYPE(qtype), - STRPACKETTYPE(packet_type)))); - /** - * COM_QUIT is one-way message. Server doesn't respond to that. - * Therefore reply processing is unnecessary and session - * command property is not needed. It is just routed to both - * backends. - */ - if (packet_type == COM_QUIT) - { - int rc; - int rc2; - - rc = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf)); - rc2 = slave_dcb->func.write(slave_dcb, querybuf); - - if (rc == 1 && rc == rc2) - { - ret = 1; - } - goto return_ret; - } - prop = rses_property_init(RSES_PROP_TYPE_SESCMD); - /** - * Additional reference is created to querybuf to - * prevent it from being released before properties - * are cleaned up as a part of router sessionclean-up. - */ - mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses); - - /** Lock router session */ - if (!rses_begin_locked_router_action(router_cli_ses)) - { - rses_property_done(prop); - goto return_ret; - } - /** Add sescmd property to router client session */ - rses_property_add(router_cli_ses, prop); - - /** Execute session command in master */ - if (execute_sescmd_in_backend(router_cli_ses, BE_MASTER)) - { - ret = 1; - } - else - { - /** Log error */ - } - /** Execute session command in slave */ - if (execute_sescmd_in_backend(router_cli_ses, BE_SLAVE)) + /** + * Session update is always routed in the same way. + */ + if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE)) + { + if (route_session_write( + router_cli_ses, + querybuf, + inst, + packet_type, + qtype)) { ret = 1; } - else + else { - /** Log error */ + ret = 0; } - - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); - - atomic_add(&inst->stats.n_all, 1); goto return_ret; - break; - - case QUERY_TYPE_BEGIN_TRX: - transaction_active = true; - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "routeQuery", - master_dcb, - gwbuf_clone(querybuf))); - ret = master_dcb->func.write(master_dcb, querybuf); - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [routeQuery:rwsplit] Routed.", - pthread_self()))); - atomic_add(&inst->stats.n_master, 1); - goto return_ret; - break; - - case QUERY_TYPE_COMMIT: - case QUERY_TYPE_ROLLBACK: - transaction_active = false; - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "routeQuery", - master_dcb, - gwbuf_clone(querybuf))); - ret = master_dcb->func.write(master_dcb, querybuf); - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [routeQuery:rwsplit] Routed.", - pthread_self()))); - atomic_add(&inst->stats.n_master, 1); - goto return_ret; - break; - - default: + } + else if (transaction_active) /*< all to master */ + { LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [routeQuery:rwsplit] Query type\t%s, " - "routing to Master by default.", - pthread_self(), - STRQTYPE(qtype)))); - - /** - * Is this really ok? - * What is not known is routed to master. - */ - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "routeQuery", - master_dcb, - gwbuf_clone(querybuf))); - + LOGFILE_TRACE, + "Transaction is active, routing to Master."))); + ret = master_dcb->func.write(master_dcb, querybuf); atomic_add(&inst->stats.n_master, 1); + goto return_ret; - break; - } /*< switch by query type */ + } + else if (QUERY_IS_TYPE( + qtype, + (QUERY_TYPE_BEGIN_TRX|QUERY_TYPE_WRITE|QUERY_TYPE_UNKNOWN))) + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Begin transaction, write or unspecified type, " + "routing to Master."))); + ret = master_dcb->func.write(master_dcb, querybuf); + atomic_add(&inst->stats.n_master, 1); + + goto return_ret; + } + else + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Read-only query, routing to Slave."))); + + ss_dassert(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ)); + ret = slave_dcb->func.write(slave_dcb, querybuf); + atomic_add(&inst->stats.n_slave, 1); + + goto return_ret; + } return_ret: if (plainsqlbuf != NULL) @@ -1851,4 +1750,102 @@ static uint8_t getCapabilities ( return_rc: return rc; +} + +/** + * Execute in backends used by current router session. + * Save session variable commands to router session property + * struct. Thus, they can be replayed in backends which are + * started and joined later. + * + * Suppress redundant OK packets sent by backends. + * + * The first OK packet is replied to the client. + * Return true if succeed, false is returned if router session was closed or + * if execute_sescmd_in_backend failed. + */ +static bool route_session_write( + ROUTER_CLIENT_SES* router_cli_ses, + GWBUF* querybuf, + ROUTER_INSTANCE* inst, + unsigned char packet_type, + skygw_query_type_t qtype) +{ + bool succp; + DCB* master_dcb; + DCB* slave_dcb; + rses_property_t* prop; + + master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; + slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; + CHK_DCB(master_dcb); + CHK_DCB(slave_dcb); + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Session write, query type\t%s, packet type %s, " + "routing to all servers.", + STRQTYPE(qtype), + STRPACKETTYPE(packet_type)))); + /** + * COM_QUIT is one-way message. Server doesn't respond to that. + * Therefore reply processing is unnecessary and session + * command property is not needed. It is just routed to both + * backends. + */ + if (packet_type == COM_QUIT) + { + int rc; + int rc2; + + rc = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf)); + rc2 = slave_dcb->func.write(slave_dcb, querybuf); + + if (rc == 1 && rc == rc2) + { + succp = true; + } + goto return_succp; + } + prop = rses_property_init(RSES_PROP_TYPE_SESCMD); + /** + * Additional reference is created to querybuf to + * prevent it from being released before properties + * are cleaned up as a part of router sessionclean-up. + */ + mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses); + + /** Lock router session */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + rses_property_done(prop); + succp = false; + goto return_succp; + } + /** Add sescmd property to router client session */ + rses_property_add(router_cli_ses, prop); + + /** Execute session command in master */ + succp = execute_sescmd_in_backend(router_cli_ses, BE_MASTER); + + if (!succp) + { + /** Unlock router session */ + rses_end_locked_router_action(router_cli_ses); + goto return_succp; + } + /** Execute session command in slave */ + succp = execute_sescmd_in_backend(router_cli_ses, BE_SLAVE); + if (!succp) + { + /** Unlock router session */ + rses_end_locked_router_action(router_cli_ses); + goto return_succp; + } + /** Unlock router session */ + rses_end_locked_router_action(router_cli_ses); + + atomic_add(&inst->stats.n_all, 1); + +return_succp: + return succp; } \ No newline at end of file