Merge remote-tracking branch 'ybbct/MXS-1603' into develop

This commit is contained in:
Markus Mäkelä
2018-01-24 11:15:14 +02:00
9 changed files with 545 additions and 10 deletions

View File

@ -770,6 +770,9 @@ gw_read_and_write(DCB *dcb)
return 0;
}
/** Get sesion track info from ok packet and save it to gwbuf properties */
mxs_mysql_get_session_track_info(tmp, proto);
read_buffer = tmp;
if (rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_OUTPUT) ||

View File

@ -76,7 +76,7 @@ static spec_com_res_t process_special_commands(DCB *client_dcb, GWBUF *read_buff
static spec_com_res_t handle_query_kill(DCB* dcb, GWBUF* read_buffer, spec_com_res_t current,
bool is_complete, unsigned int packet_len);
static bool parse_kill_query(char *query, uint64_t *thread_id_out, kill_type_t *kt_out, std::string* user);
static void parse_and_set_trx_state(MXS_SESSION *ses, GWBUF *data);
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
@ -410,7 +410,11 @@ int MySQLSendHandshake(DCB* dcb)
* @param queue Queue of buffers to write
*/
int gw_MySQLWrite_client(DCB *dcb, GWBUF *queue)
{
{
if (GWBUF_IS_REPLY_OK(queue) && dcb->service->session_track_trx_state)
{
parse_and_set_trx_state(dcb->session, queue);
}
return dcb_write(dcb, queue);
}
@ -1266,7 +1270,6 @@ int gw_MySQLListener(DCB *listen_dcb, char *config_bind)
int gw_MySQLAccept(DCB *listener)
{
DCB *client_dcb;
MySQLProtocol *protocol;
CHK_DCB(listener);
@ -1509,8 +1512,8 @@ static int route_by_statement(MXS_SESSION* session, uint64_t capabilities, GWBUF
goto return_rc;
}
}
if (rcap_type_required(capabilities, RCAP_TYPE_TRANSACTION_TRACKING))
SERVICE *service = session->client_dcb->service;
if (rcap_type_required(capabilities, RCAP_TYPE_TRANSACTION_TRACKING) && !service->session_track_trx_state)
{
if (session_trx_is_ending(session))
{
@ -1889,3 +1892,64 @@ static bool parse_kill_query(char *query, uint64_t *thread_id_out, kill_type_t *
return true;
}
}
/*
* Mapping three session tracker's info to mxs_session_trx_state_t
* SESSION_TRACK_STATE_CHANGE:
* Get lasted autocommit value;
* https://dev.mysql.com/worklog/task/?id=6885
* SESSION_TRACK_TRANSACTION_TYPE:
* Get transaction boundaries
* TX_EMPTY => SESSION_TRX_INACTIVE
* TX_EXPLICIT | TX_IMPLICIT => SESSION_TRX_ACTIVE
* https://dev.mysql.com/worklog/task/?id=6885
* SESSION_TRACK_TRANSACTION_CHARACTERISTICS
* Get trx characteristics such as read only, read write, snapshot ...
*
*/
static void parse_and_set_trx_state(MXS_SESSION *ses, GWBUF *data)
{
char *autocommit = gwbuf_get_property(data, (char *)"autocommit");
if (autocommit)
{
MXS_DEBUG("autocommit:%s", autocommit);
if (strncasecmp(autocommit, "ON", 2) == 0)
{
session_set_autocommit(ses, true);
}
if (strncasecmp(autocommit, "OFF", 3) == 0)
{
session_set_autocommit(ses, false);
}
}
char *trx_state = gwbuf_get_property(data, (char *)"trx_state");
if (trx_state)
{
mysql_tx_state_t s = parse_trx_state(trx_state);
if (s == TX_EMPTY)
{
session_set_trx_state(ses, SESSION_TRX_INACTIVE);
}
else if ((s & TX_EXPLICIT) || (s & TX_IMPLICIT))
{
session_set_trx_state(ses, SESSION_TRX_ACTIVE);
}
}
char *trx_characteristics = gwbuf_get_property(data, (char *)"trx_characteristics");
if (trx_characteristics)
{
if (strncmp(trx_characteristics, "START TRANSACTION READ ONLY;", 28) == 0)
{
session_set_trx_state(ses, SESSION_TRX_READ_ONLY);
}
if (strncmp(trx_characteristics, "START TRANSACTION READ WRITE;", 29) == 0)
{
session_set_trx_state(ses, SESSION_TRX_READ_WRITE);
}
}
MXS_DEBUG("trx state:%s", session_trx_state_to_string(ses->trx_state));
MXS_DEBUG("autcommit:%s", session_is_autocommit(ses)?"ON":"OFF");
}

View File

@ -71,6 +71,7 @@ MySQLProtocol* mysql_protocol_init(DCB* dcb, int fd)
p->extra_capabilities = 0;
p->ignore_replies = 0;
p->collect_result = false;
p->num_eof_packets = 0;
#if defined(SS_DEBUG)
p->protocol_chk_top = CHK_NUM_PROTOCOL;
p->protocol_chk_tail = CHK_NUM_PROTOCOL;
@ -1228,6 +1229,8 @@ create_capabilities(MySQLProtocol *conn, bool with_ssl, bool db_specified, bool
/* Maybe it should depend on whether CA certificate is provided */
/* final_capabilities |= (uint32_t)GW_MYSQL_CAPABILITIES_SSL_VERIFY_SERVER_CERT; */
}
/** add session track */
final_capabilities |= (uint32_t)GW_MYSQL_CAPABILITIES_SESSION_TRACK;
/* Compression is not currently supported */
ss_dassert(!compress);
@ -1459,10 +1462,7 @@ int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload)
mysql_server_capabilities_two = gw_mysql_get_byte2(payload);
memcpy(capab_ptr, &mysql_server_capabilities_one, 2);
// get capabilities part 2 (2 bytes)
memcpy(&capab_ptr[2], &mysql_server_capabilities_two, 2);
conn->server_capabilities = mysql_server_capabilities_one | mysql_server_capabilities_two << 16;
// 2 bytes shift
payload += 2;
@ -1768,3 +1768,197 @@ void mxs_mysql_execute_kill_user(MXS_SESSION* issuer, const char* user, kill_typ
mxs_mysql_send_ok(issuer->client_dcb, info.targets.size(), 0, NULL);
}
/**
* Parse ok packet to get session track info, save to buff properties
* @param buff Buffer contain multi compelte packets
* @param packet_offset Ok packet offset in this buff
* @param packet_len Ok packet lengh
*/
void mxs_mysql_parse_ok_packet(GWBUF *buff, size_t packet_offset, size_t packet_len)
{
uint8_t local_buf[packet_len];
uint8_t *ptr = local_buf;
char *trx_info, *var_name, *var_value;
gwbuf_copy_data(buff, packet_offset, packet_len, local_buf);
ptr += (MYSQL_HEADER_LEN + 1); // Header and Command type
mxs_leint_consume(&ptr); // Affected rows
mxs_leint_consume(&ptr); // Last insert-id
uint16_t server_status = gw_mysql_get_byte2(ptr);
ptr += 2; // status
ptr += 2; // number of warnings
if (ptr < (local_buf + packet_len))
{
ptr += mxs_leint_consume(&ptr); // info
if (server_status & SERVER_SESSION_STATE_CHANGED)
{
mxs_leint_consume(&ptr); // total SERVER_SESSION_STATE_CHANGED length
while (ptr < (local_buf + packet_len))
{
enum_session_state_type type =
(enum enum_session_state_type)mxs_leint_consume(&ptr);
#if defined(SS_DEBUG)
ss_dassert(type <= SESSION_TRACK_TRANSACTION_TYPE);
#endif
switch (type)
{
case SESSION_TRACK_STATE_CHANGE:
case SESSION_TRACK_SCHEMA:
case SESSION_TRACK_GTIDS:
ptr += mxs_leint_consume(&ptr);
break;
case SESSION_TRACK_TRANSACTION_CHARACTERISTICS:
mxs_leint_consume(&ptr); //length
var_value = mxs_lestr_consume_dup(&ptr);
gwbuf_add_property(buff, (char *)"trx_characteristics", var_value);
MXS_FREE(var_value);
break;
case SESSION_TRACK_SYSTEM_VARIABLES:
mxs_leint_consume(&ptr); //lenth
// system variables like autocommit, schema, charset ...
var_name = mxs_lestr_consume_dup(&ptr);
var_value = mxs_lestr_consume_dup(&ptr);
gwbuf_add_property(buff, var_name, var_value);
MXS_DEBUG("SESSION_TRACK_SYSTEM_VARIABLES, name:%s, value:%s", var_name, var_value);
MXS_FREE(var_name);
MXS_FREE(var_value);
break;
case SESSION_TRACK_TRANSACTION_TYPE:
mxs_leint_consume(&ptr); // length
trx_info = mxs_lestr_consume_dup(&ptr);
MXS_DEBUG("get trx_info:%s", trx_info);
gwbuf_add_property(buff, (char *)"trx_state", trx_info);
MXS_FREE(trx_info);
break;
default:
ptr += mxs_leint_consume(&ptr);
MXS_WARNING("recieved unexpecting session track type:%d", type);
break;
}
}
}
}
}
/**
* Check every packet type, if is ok packet then parse it
* @param buff Buffer contain multi compelte packets
* @param server_capabilities Server capabilities
*/
void mxs_mysql_get_session_track_info(GWBUF *buff, MySQLProtocol *proto)
{
size_t offset = 0;
uint8_t header_and_command[MYSQL_HEADER_LEN+1];
if (proto->server_capabilities&GW_MYSQL_CAPABILITIES_SESSION_TRACK)
{
while (gwbuf_copy_data(buff, offset, MYSQL_HEADER_LEN+1, header_and_command) == (MYSQL_HEADER_LEN+1))
{
size_t packet_len = gw_mysql_get_byte3(header_and_command);
uint8_t cmd = header_and_command[MYSQL_COM_OFFSET];
if (packet_len > MYSQL_OK_PACKET_MIN_LEN &&
cmd == MYSQL_REPLY_OK &&
(proto->num_eof_packets % 2) == 0)
{
buff->gwbuf_type |= GWBUF_TYPE_REPLY_OK;
mxs_mysql_parse_ok_packet(buff, offset, packet_len + MYSQL_HEADER_LEN);
}
if ((proto->current_command == MXS_COM_QUERY ||
proto->current_command == MXS_COM_STMT_FETCH ||
proto->current_command == MXS_COM_STMT_EXECUTE) &&
cmd == MYSQL_REPLY_EOF)
{
proto->num_eof_packets++;
}
offset += (packet_len + MYSQL_HEADER_LEN);
}
}
}
/***
* As described in https://dev.mysql.com/worklog/task/?id=6631
* When session transation state changed
* SESSION_TRACK_TRANSACTION_TYPE (or SESSION_TRACK_TRANSACTION_STATE in MySQL) will
* return an 8 bytes string to indicate the transaction state details
* Place 1: Transaction.
* T explicitly started transaction ongoing
* I implicitly started transaction (@autocommit=0) ongoing
* _ no active transaction
*
* Place 2: unsafe read
* r one/several non-transactional tables were read
* in the context of the current transaction
* _ no non-transactional tables were read within
* the current transaction so far
*
* Place 3: transactional read
* R one/several transactional tables were read
* _ no transactional tables were read yet
*
* Place 4: unsafe write
* w one/several non-transactional tables were written
* _ no non-transactional tables were written yet
*
* Place 5: transactional write
* W one/several transactional tables were written to
* _ no transactional tables were written to yet
*
* Place 6: unsafe statements
* s one/several unsafe statements (such as UUID())
* were used.
* _ no such statements were used yet.
*
* Place 7: result-set
* S a result set was sent to the client
* _ statement had no result-set
*
* Place 8: LOCKed TABLES
* L tables were explicitly locked using LOCK TABLES
* _ LOCK TABLES is not active in this session
* */
mysql_tx_state_t parse_trx_state(const char *str)
{
int s = TX_EMPTY;
ss_dassert(str);
do
{
switch (*str)
{
case 'T':
s |= TX_EXPLICIT;
break;
case 'I':
s |= TX_IMPLICIT;
break;
case 'r':
s |= TX_READ_UNSAFE;
break;
case 'R':
s |= TX_READ_TRX;
break;
case 'w':
s |= TX_WRITE_UNSAFE;
break;
case 'W':
s |= TX_WRITE_TRX;
break;
case 's':
s |= TX_STMT_UNSAFE;
break;
case 'S':
s |= TX_RESULT_SET;
break;
case 'L':
s |= TX_LOCKED_TABLES;
break;
default:
break;
}
} while(*(str++) != 0);
return (mysql_tx_state_t)s;
}