MAX-10, Transaction support for MaxScale.

Naive implementation, which routes all statements to master between BEGIN|START TRANSACTION <options> and ROLLBACK|COMMIT
This commit is contained in:
VilhoRaatikka 2014-03-18 23:41:32 +02:00
parent d7e9978ac1
commit d6a9a5c1d0
4 changed files with 82 additions and 22 deletions

View File

@ -390,7 +390,7 @@ static skygw_query_type_t resolve_query_type(
/** SELECT ..INTO variable|OUTFILE|DUMPFILE */
if (lex->result != NULL) {
qtype = QUERY_TYPE_SESSION_WRITE;
goto return_here;
goto return_qtype;
}
/**
* 1:ALTER TABLE, TRUNCATE, REPAIR, OPTIMIZE, ANALYZE, CHECK.
@ -412,7 +412,7 @@ static skygw_query_type_t resolve_query_type(
qtype = QUERY_TYPE_WRITE;
}
goto return_here;
goto return_qtype;
}
/**
@ -428,7 +428,7 @@ static skygw_query_type_t resolve_query_type(
{
qtype = QUERY_TYPE_SESSION_WRITE;
}
goto return_here;
goto return_qtype;
}
/** Try to catch session modifications here */
@ -452,6 +452,21 @@ static skygw_query_type_t resolve_query_type(
qtype = QUERY_TYPE_WRITE;
break;
case SQLCOM_BEGIN:
qtype = QUERY_TYPE_BEGIN_TRX;
goto return_qtype;
break;
case SQLCOM_COMMIT:
qtype = QUERY_TYPE_COMMIT;
goto return_qtype;
break;
case SQLCOM_ROLLBACK:
qtype = QUERY_TYPE_ROLLBACK;
goto return_qtype;
break;
default:
break;
}
@ -603,6 +618,6 @@ static skygw_query_type_t resolve_query_type(
}
} /**< for */
} /**< if */
return_here:
return_qtype:
return qtype;
}

View File

@ -29,12 +29,15 @@ EXTERN_C_BLOCK_BEGIN
* is modified
*/
typedef enum {
QUERY_TYPE_UNKNOWN = 7, /*!< Couln't find out or parse error */
QUERY_TYPE_LOCAL_READ, /*!< Read non-database data, execute in MaxScale */
QUERY_TYPE_READ, /*!< No updates */
QUERY_TYPE_WRITE, /*!< Master data will be modified */
QUERY_TYPE_SESSION_WRITE,/*!< Session data will be modified */
QUERY_TYPE_GLOBAL_WRITE /*!< Global system variable modification */
QUERY_TYPE_UNKNOWN = 7, /*< Couln't find out or parse error */
QUERY_TYPE_LOCAL_READ, /*< Read non-database data, execute in MaxScale */
QUERY_TYPE_READ, /*< No updates */
QUERY_TYPE_WRITE, /*< Master data will be modified */
QUERY_TYPE_SESSION_WRITE,/*< Session data will be modified */
QUERY_TYPE_GLOBAL_WRITE, /*< Global system variable modification */
QUERY_TYPE_BEGIN_TRX, /*< BEGIN or START TRANSACTION */
QUERY_TYPE_ROLLBACK, /*< ROLLBACK */
QUERY_TYPE_COMMIT /*< COMMIT */
} skygw_query_type_t;

View File

@ -283,7 +283,6 @@ static int gw_read_backend_event(DCB *dcb) {
}
if (backend_protocol->state == MYSQL_AUTH_FAILED) {
spinlock_acquire(&dcb->delayqlock);
/*<
* vraa : errorHandle
@ -529,6 +528,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
* Don't write to backend if backend_dcb is not in poll set anymore.
*/
spinlock_acquire(&dcb->authlock);
if (dcb->state != DCB_STATE_POLLING) {
/*< vraa : errorHandle */
/*< Free buffer memory */
@ -546,7 +546,6 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
spinlock_release(&dcb->authlock);
return 0;
}
/*<
* Now put the incoming data to the delay queue unless backend is
* connected with auth ok

View File

@ -583,6 +583,7 @@ static int routeQuery(
bool rses_is_closed;
rses_property_t* prop;
size_t len;
static bool transaction_active;
CHK_CLIENT_RSES(router_cli_ses);
@ -700,16 +701,27 @@ static int routeQuery(
case QUERY_TYPE_READ:
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Query type\t%s, "
"routing to Slave.",
pthread_self(),
STRQTYPE(qtype))));
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
slave_dcb,
gwbuf_clone(querybuf)));
ret = slave_dcb->func.write(slave_dcb, querybuf);
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Query type\t%s, "
"routing to %s.",
pthread_self(),
STRQTYPE(qtype),
(transaction_active ? "Master" : "Slave"))));
LOGIF(LT, tracelog_routed_query(
router_cli_ses,
"routeQuery",
(transaction_active ? master_dcb : slave_dcb),
gwbuf_clone(querybuf)));
if (transaction_active)
{
ret = master_dcb->func.write(master_dcb, querybuf);
}
else
{
ret = slave_dcb->func.write(slave_dcb, querybuf);
}
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Routed.",
@ -807,6 +819,37 @@ static int routeQuery(
goto return_ret;
break;
case QUERY_TYPE_BEGIN_TRX:
transaction_active = true;
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
master_dcb,
gwbuf_clone(querybuf)));
ret = master_dcb->func.write(master_dcb, querybuf);
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Routed.",
pthread_self())));
atomic_add(&inst->stats.n_master, 1);
goto return_ret;
break;
case QUERY_TYPE_COMMIT:
case QUERY_TYPE_ROLLBACK:
transaction_active = false;
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
master_dcb,
gwbuf_clone(querybuf)));
ret = master_dcb->func.write(master_dcb, querybuf);
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Routed.",
pthread_self())));
atomic_add(&inst->stats.n_master, 1);
goto return_ret;
break;
default:
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,