get session transation state from backend via session track mechanism

This commit is contained in:
Dapeng Huang
2018-01-14 12:23:38 +08:00
parent 27e563dcff
commit e1aeac8b07
16 changed files with 231 additions and 32 deletions

View File

@ -9,7 +9,7 @@ set(MARIADB_CONNECTOR_C_REPO "https://github.com/MariaDB/mariadb-connector-c.git
CACHE STRING "MariaDB Connector-C Git repository")
# Release 2.3.3 (preliminary) of the Connector-C
set(MARIADB_CONNECTOR_C_TAG "v3.0.2"
set(MARIADB_CONNECTOR_C_TAG "master"
CACHE STRING "MariaDB Connector-C Git tag")
ExternalProject_Add(connector-c

View File

@ -59,6 +59,7 @@ typedef enum
GWBUF_TYPE_IGNORABLE = 0x10,
GWBUF_TYPE_COLLECT_RESULT = 0x20,
GWBUF_TYPE_RESULT = 0x40,
GWBUF_TYPE_REPLY_OK = 0x80,
} gwbuf_type_t;
#define GWBUF_IS_TYPE_UNDEFINED(b) (b->gwbuf_type == 0)
@ -68,6 +69,7 @@ typedef enum
#define GWBUF_IS_IGNORABLE(b) (b->gwbuf_type & GWBUF_TYPE_IGNORABLE)
#define GWBUF_IS_COLLECTED_RESULT(b) (b->gwbuf_type & GWBUF_TYPE_RESULT)
#define GWBUF_SHOULD_COLLECT_RESULT(b) (b->gwbuf_type & GWBUF_TYPE_COLLECT_RESULT)
#define GWBUF_IS_REPLY_OK(b) (b->gwbuf_type & GWBUF_TYPE_REPLY_OK)
typedef enum
{

View File

@ -54,16 +54,17 @@ MXS_BEGIN_DECLS
#define MXS_JSON_PTR_RELATIONSHIPS_FILTERS "/data/relationships/filters/data"
/** Parameter value JSON Pointers */
#define MXS_JSON_PTR_PARAM_PORT MXS_JSON_PTR_PARAMETERS "/port"
#define MXS_JSON_PTR_PARAM_ADDRESS MXS_JSON_PTR_PARAMETERS "/address"
#define MXS_JSON_PTR_PARAM_PROTOCOL MXS_JSON_PTR_PARAMETERS "/protocol"
#define MXS_JSON_PTR_PARAM_AUTHENTICATOR MXS_JSON_PTR_PARAMETERS "/authenticator"
#define MXS_JSON_PTR_PARAM_AUTHENTICATOR_OPTIONS MXS_JSON_PTR_PARAMETERS "/authenticator_options"
#define MXS_JSON_PTR_PARAM_SSL_KEY MXS_JSON_PTR_PARAMETERS "/ssl_key"
#define MXS_JSON_PTR_PARAM_SSL_CERT MXS_JSON_PTR_PARAMETERS "/ssl_cert"
#define MXS_JSON_PTR_PARAM_SSL_CA_CERT MXS_JSON_PTR_PARAMETERS "/ssl_ca_cert"
#define MXS_JSON_PTR_PARAM_SSL_VERSION MXS_JSON_PTR_PARAMETERS "/ssl_version"
#define MXS_JSON_PTR_PARAM_SSL_CERT_VERIFY_DEPTH MXS_JSON_PTR_PARAMETERS "/ssl_cert_verify_depth"
#define MXS_JSON_PTR_PARAM_PORT MXS_JSON_PTR_PARAMETERS "/port"
#define MXS_JSON_PTR_PARAM_ADDRESS MXS_JSON_PTR_PARAMETERS "/address"
#define MXS_JSON_PTR_PARAM_PROTOCOL MXS_JSON_PTR_PARAMETERS "/protocol"
#define MXS_JSON_PTR_PARAM_AUTHENTICATOR MXS_JSON_PTR_PARAMETERS "/authenticator"
#define MXS_JSON_PTR_PARAM_AUTHENTICATOR_OPTIONS MXS_JSON_PTR_PARAMETERS "/authenticator_options"
#define MXS_JSON_PTR_PARAM_SSL_KEY MXS_JSON_PTR_PARAMETERS "/ssl_key"
#define MXS_JSON_PTR_PARAM_SSL_CERT MXS_JSON_PTR_PARAMETERS "/ssl_cert"
#define MXS_JSON_PTR_PARAM_SSL_CA_CERT MXS_JSON_PTR_PARAMETERS "/ssl_ca_cert"
#define MXS_JSON_PTR_PARAM_SSL_VERSION MXS_JSON_PTR_PARAMETERS "/ssl_version"
#define MXS_JSON_PTR_PARAM_SSL_CERT_VERIFY_DEPTH MXS_JSON_PTR_PARAMETERS "/ssl_cert_verify_depth"
#define MXS_JSON_PTR_PARAM_SESSION_TRACK_TRX_STATE MXS_JSON_PTR_PARAMETERS "/session_track_trx_state"
/** Non-parameter JSON pointers */
#define MXS_JSON_PTR_MODULE "/data/attributes/module"

View File

@ -49,6 +49,7 @@ typedef struct servlistener
SPINLOCK lock;
int active; /**< True if the port has not been deleted */
struct servlistener *next; /**< Next service protocol */
bool session_track_trx_state; /** Get transation state from backend */
} SERV_LISTENER; // TODO: Rename to LISTENER
typedef struct listener_iterator
@ -79,7 +80,7 @@ json_t* listener_to_json(const SERV_LISTENER* listener);
SERV_LISTENER* listener_alloc(struct service* service, const char* name, const char *protocol,
const char *address, unsigned short port, const char *authenticator,
const char* auth_options, SSL_LISTENER *ssl);
const char* auth_options, SSL_LISTENER *ssl, bool get_trx_state_from_backend);
void listener_free(SERV_LISTENER* listener);
int listener_set_ssl_version(SSL_LISTENER *ssl_listener, char* version);
void listener_set_certificates(SSL_LISTENER *ssl_listener, char* cert, char* key, char* ca_cert);

View File

@ -118,6 +118,20 @@ MXS_BEGIN_DECLS
#define COM_QUIT_PACKET_SIZE (4+1)
struct dcb;
typedef enum
{
TX_EMPTY = 0, ///< "none of the below"
TX_EXPLICIT = 1, ///< an explicit transaction is active
TX_IMPLICIT = 2, ///< an implicit transaction is active
TX_READ_TRX = 4, ///< transactional reads were done
TX_READ_UNSAFE = 8, ///< non-transaction reads were done
TX_WRITE_TRX = 16, ///< transactional writes were done
TX_WRITE_UNSAFE = 32, ///< non-transactional writes were done
TX_STMT_UNSAFE = 64, ///< "unsafe" (non-deterministic like UUID()) stmts
TX_RESULT_SET = 128, ///< result-set was sent
TX_WITH_SNAPSHOT= 256, ///< WITH CONSISTENT SNAPSHOT was used
TX_LOCKED_TABLES= 512 ///< LOCK TABLES is active
} mysql_tx_state_t;
typedef enum
{
@ -339,6 +353,7 @@ typedef struct
int ignore_replies; /*< How many replies should be discarded */
GWBUF* stored_query; /*< Temporarily stored queries */
bool collect_result; /*< Collect the next result set as one buffer */
bool session_track_trx_state; /*< Get transation state from backend */
#if defined(SS_DEBUG)
skygw_chk_t protocol_chk_tail;
#endif
@ -478,6 +493,8 @@ char* create_auth_fail_str(char *username, char *hostaddr, bool password, char *
void init_response_status(GWBUF* buf, uint8_t cmd, int* npackets, size_t* nbytes);
bool read_complete_packet(DCB *dcb, GWBUF **readbuf);
bool gw_get_shared_session_auth_info(DCB* dcb, MYSQL_session* session);
void mxs_mysql_get_session_track_info(GWBUF *buff, uint32_t server_capabilities);
mysql_tx_state_t parse_trx_state(char *str);
/**
* Decode server handshake

View File

@ -147,6 +147,7 @@ const char CN_USER[] = "user";
const char CN_USERS[] = "users";
const char CN_VERSION_STRING[] = "version_string";
const char CN_WEIGHTBY[] = "weightby";
const char CN_SESSION_TRACK_TRX_STATE[] = "session_track_trx_state";
typedef struct duplicate_context
{
@ -233,6 +234,7 @@ const char *config_listener_params[] =
CN_SSL_VERSION,
CN_SSL_CERT_VERIFY_DEPTH,
CN_SSL_VERIFY_PEER_CERTIFICATE,
CN_SESSION_TRACK_TRX_STATE,
NULL
};
@ -3369,7 +3371,7 @@ int create_new_listener(CONFIG_CONTEXT *obj)
char *socket = config_get_value(obj->parameters, CN_SOCKET);
char *authenticator = config_get_value(obj->parameters, CN_AUTHENTICATOR);
char *authenticator_options = config_get_value(obj->parameters, CN_AUTHENTICATOR_OPTIONS);
bool session_track_trx_state = config_get_bool(obj->parameters, CN_SESSION_TRACK_TRX_STATE);
if (raw_service_name && protocol && (socket || port))
{
char service_name[strlen(raw_service_name) + 1];
@ -3391,7 +3393,7 @@ int create_new_listener(CONFIG_CONTEXT *obj)
else
{
serviceCreateListener(service, obj->object, protocol, socket, 0,
authenticator, authenticator_options, ssl_info);
authenticator, authenticator_options, ssl_info, session_track_trx_state);
}
}
@ -3408,7 +3410,7 @@ int create_new_listener(CONFIG_CONTEXT *obj)
else
{
serviceCreateListener(service, obj->object, protocol, address, atoi(port),
authenticator, authenticator_options, ssl_info);
authenticator, authenticator_options, ssl_info, session_track_trx_state);
}
}

View File

@ -795,7 +795,7 @@ bool runtime_create_listener(SERVICE *service, const char *name, const char *add
const char *port, const char *proto, const char *auth,
const char *auth_opt, const char *ssl_key,
const char *ssl_cert, const char *ssl_ca,
const char *ssl_version, const char *ssl_depth)
const char *ssl_version, const char *ssl_depth, bool session_track_trx_state)
{
if (addr == NULL || strcasecmp(addr, CN_DEFAULT) == 0)
@ -842,7 +842,7 @@ bool runtime_create_listener(SERVICE *service, const char *name, const char *add
{
const char *print_addr = addr ? addr : "::";
SERV_LISTENER *listener = serviceCreateListener(service, name, proto, addr,
u_port, auth, auth_opt, ssl);
u_port, auth, auth_opt, ssl, session_track_trx_state);
if (listener && listener_serialize(listener))
{
@ -1961,11 +1961,12 @@ bool runtime_create_listener_from_json(SERVICE* service, json_t* json)
const char* ssl_ca_cert = get_string_or_null(json, MXS_JSON_PTR_PARAM_SSL_CA_CERT);
const char* ssl_version = get_string_or_null(json, MXS_JSON_PTR_PARAM_SSL_VERSION);
const char* ssl_cert_verify_depth = get_string_or_null(json, MXS_JSON_PTR_PARAM_SSL_CERT_VERIFY_DEPTH);
bool session_track_trx_state = is_bool_or_null(json, MXS_JSON_PTR_PARAM_SESSION_TRACK_TRX_STATE);
rval = runtime_create_listener(service, id, address, port.c_str(), protocol,
authenticator, authenticator_options,
ssl_key, ssl_cert, ssl_ca_cert, ssl_version,
ssl_cert_verify_depth);
ssl_cert_verify_depth, session_track_trx_state);
}
return rval;

View File

@ -166,7 +166,8 @@ bool runtime_create_listener(SERVICE *service, const char *name, const char *add
const char *port, const char *proto, const char *auth,
const char *auth_opt, const char *ssl_key,
const char *ssl_cert, const char *ssl_ca,
const char *ssl_version, const char *ssl_depth);
const char *ssl_version, const char *ssl_depth,
bool session_track_trx_state);
/**
* @brief Destroy a listener

View File

@ -84,7 +84,7 @@ bool service_thread_init();
SERV_LISTENER* serviceCreateListener(SERVICE *service, const char *name,
const char *protocol, const char *address,
unsigned short port, const char *authenticator,
const char *options, SSL_LISTENER *ssl);
const char *options, SSL_LISTENER *ssl, bool session_track_trx_state);
void serviceRemoveBackend(SERVICE *service, const SERVER *server);

View File

@ -62,7 +62,7 @@ static RSA *tmp_rsa_callback(SSL *s, int is_export, int keylength);
SERV_LISTENER *
listener_alloc(struct service* service, const char* name, const char *protocol,
const char *address, unsigned short port, const char *authenticator,
const char* auth_options, SSL_LISTENER *ssl)
const char* auth_options, SSL_LISTENER *ssl, bool session_track_trx_state)
{
char *my_address = NULL;
if (address)
@ -135,6 +135,7 @@ listener_alloc(struct service* service, const char* name, const char *protocol,
proto->users = NULL;
proto->next = NULL;
proto->auth_instance = auth_instance;
proto->session_track_trx_state = session_track_trx_state;
spinlock_init(&proto->lock);
return proto;

View File

@ -755,10 +755,10 @@ static void service_add_listener(SERVICE* service, SERV_LISTENER* proto)
*/
SERV_LISTENER* serviceCreateListener(SERVICE *service, const char *name, const char *protocol,
const char *address, unsigned short port, const char *authenticator,
const char *options, SSL_LISTENER *ssl)
const char *options, SSL_LISTENER *ssl, bool session_track_trx_state)
{
SERV_LISTENER *proto = listener_alloc(service, name, protocol, address,
port, authenticator, options, ssl);
port, authenticator, options, ssl, session_track_trx_state);
if (proto)
{

View File

@ -71,7 +71,7 @@ test1()
ss_dfprintf(stderr, "\t..done\nAdding protocol testprotocol.");
set_libdir(MXS_STRDUP_A("../../modules/authenticator/MySQLAuth/"));
ss_info_dassert(serviceCreateListener(service, "TestProtocol", "testprotocol",
"localhost", 9876, "MySQLAuth", NULL, NULL),
"localhost", 9876, "MySQLAuth", NULL, NULL, false),
"Add Protocol should succeed");
ss_info_dassert(0 != serviceHasListener(service, "TestProtocol", "testprotocol", "localhost", 9876),
"Service should have new protocol as requested");

View File

@ -977,6 +977,8 @@ gw_read_and_write(DCB *dcb)
gwbuf_set_type(stmt, GWBUF_TYPE_RESULT);
}
mxs_mysql_get_session_track_info(stmt, proto->server_capabilities);
session->service->router->clientReply(session->service->router_instance,
session->router_session,
stmt, dcb);

View File

@ -76,7 +76,7 @@ static spec_com_res_t process_special_commands(DCB *client_dcb, GWBUF *read_buff
static spec_com_res_t handle_query_kill(DCB* dcb, GWBUF* read_buffer, spec_com_res_t current,
bool is_complete, unsigned int packet_len);
static bool parse_kill_query(char *query, uint64_t *thread_id_out, kill_type_t *kt_out, std::string* user);
static void parse_and_set_trx_state(MXS_SESSION *ses, GWBUF *data);
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
@ -411,6 +411,11 @@ int MySQLSendHandshake(DCB* dcb)
*/
int gw_MySQLWrite_client(DCB *dcb, GWBUF *queue)
{
MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
if (GWBUF_IS_REPLY_OK(queue) && protocol->session_track_trx_state)
{
parse_and_set_trx_state(dcb->session, queue);
}
return dcb_write(dcb, queue);
}
@ -1266,7 +1271,8 @@ int gw_MySQLListener(DCB *listen_dcb, char *config_bind)
int gw_MySQLAccept(DCB *listener)
{
DCB *client_dcb;
MySQLProtocol *protocol;
MySQLProtocol *cli_proto;
SERV_LISTENER *listener_proto;
CHK_DCB(listener);
@ -1279,6 +1285,9 @@ int gw_MySQLAccept(DCB *listener)
while ((client_dcb = dcb_accept(listener)) != NULL)
{
gw_process_one_new_client(client_dcb);
cli_proto = DCB_PROTOCOL(client_dcb, MySQLProtocol);
listener_proto = (SERV_LISTENER *)listener->listener;
cli_proto->session_track_trx_state = listener_proto->session_track_trx_state;
} /**< while client_dcb != NULL */
}
@ -1510,7 +1519,8 @@ static int route_by_statement(MXS_SESSION* session, uint64_t capabilities, GWBUF
}
}
if (rcap_type_required(capabilities, RCAP_TYPE_TRANSACTION_TRACKING))
MySQLProtocol *protocol = DCB_PROTOCOL(session->client_dcb, MySQLProtocol);
if (rcap_type_required(capabilities, RCAP_TYPE_TRANSACTION_TRACKING) && !protocol->session_track_trx_state)
{
if (session_trx_is_ending(session))
{
@ -1889,3 +1899,42 @@ static bool parse_kill_query(char *query, uint64_t *thread_id_out, kill_type_t *
return true;
}
}
static void parse_and_set_trx_state(MXS_SESSION *ses, GWBUF *data)
{
char *autocommit = gwbuf_get_property(data, (char *)"autocommit");
if (autocommit)
{
if (strncasecmp(autocommit, "0", 1) == 0)
{
session_set_autocommit(ses, true);
}
if (strncasecmp(autocommit, "1", 1) == 0)
{
session_set_autocommit(ses, false);
}
}
char *trx_state = gwbuf_get_property(data, (char *)"trx_state");
if (trx_state)
{
mysql_tx_state_t s = parse_trx_state(trx_state);
MXS_DEBUG("parsed tx state:%d", s);
if (s == TX_EMPTY)
{
session_set_trx_state(ses, SESSION_TRX_INACTIVE);
}
else if(s & TX_READ_TRX)
{
session_set_trx_state(ses, SESSION_TRX_READ_ONLY);
}
else if(s & TX_WRITE_TRX)
{
session_set_trx_state(ses, SESSION_TRX_READ_WRITE);
}
else if((s & TX_EXPLICIT) | (s & TX_IMPLICIT))
{
session_set_trx_state(ses, SESSION_TRX_ACTIVE);
}
}
}

View File

@ -1228,6 +1228,8 @@ create_capabilities(MySQLProtocol *conn, bool with_ssl, bool db_specified, bool
/* Maybe it should depend on whether CA certificate is provided */
/* final_capabilities |= (uint32_t)GW_MYSQL_CAPABILITIES_SSL_VERIFY_SERVER_CERT; */
}
/** add session track */
final_capabilities |= (uint32_t)GW_MYSQL_CAPABILITIES_SESSION_TRACK;
/* Compression is not currently supported */
ss_dassert(!compress);
@ -1459,10 +1461,7 @@ int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload)
mysql_server_capabilities_two = gw_mysql_get_byte2(payload);
memcpy(capab_ptr, &mysql_server_capabilities_one, 2);
// get capabilities part 2 (2 bytes)
memcpy(&capab_ptr[2], &mysql_server_capabilities_two, 2);
conn->server_capabilities = mysql_server_capabilities_one | mysql_server_capabilities_two << 16;
// 2 bytes shift
payload += 2;
@ -1768,3 +1767,126 @@ void mxs_mysql_execute_kill_user(MXS_SESSION* issuer, const char* user, kill_typ
mxs_mysql_send_ok(issuer->client_dcb, info.targets.size(), 0, NULL);
}
/**
* Decode ok packet and get session track info
*
* @param buffer Buffer contain ok packet
* @param server_capabilities Server capabilities
*
*/
void mxs_mysql_get_session_track_info(GWBUF *buff, uint32_t server_capabilities)
{
char *trx_info, *var_name, *var_value;
if (GWBUF_LENGTH(buff) < 9 || !mxs_mysql_is_ok_packet(buff))
{
return;
}
if (!GWBUF_IS_CONTIGUOUS(buff))
{
buff = gwbuf_make_contiguous(buff);
}
buff->gwbuf_type |= GWBUF_TYPE_REPLY_OK;
uint8_t* ptr = GWBUF_DATA(buff);
ptr += (MYSQL_COM_OFFSET + 1); // Header and Command type
mxs_leint_consume(&ptr); // Affected rows
mxs_leint_consume(&ptr); // Last insert-id
uint16_t server_status = gw_mysql_get_byte2(ptr);
ptr += 2; // status
ptr += 2; // number of warnings
if (server_capabilities & GW_MYSQL_CAPABILITIES_SESSION_TRACK)
{
if (ptr < buff->end) {
ptr += mxs_leint_consume(&ptr); // info
if (server_status & SERVER_SESSION_STATE_CHANGED)
{
mxs_leint_consume(&ptr); // total SERVER_SESSION_STATE_CHANGED length
while (ptr < buff->end)
{
enum_session_state_type type = (enum enum_session_state_type)mxs_leint_consume(&ptr);
switch (type)
{
case SESSION_TRACK_SYSTEM_VARIABLES:
case SESSION_TRACK_SCHEMA:
case SESSION_TRACK_GTIDS:
case SESSION_TRACK_TRANSACTION_CHARACTERISTICS:
ptr += mxs_leint_consume(&ptr);
break;
case SESSION_TRACK_STATE_CHANGE:
mxs_leint_consume(&ptr); //lenth
// system variables like autocommit, schema, charset ...
var_name = mxs_lestr_consume_dup(&ptr);
var_value = mxs_lestr_consume_dup(&ptr);
gwbuf_add_property(buff, var_name, var_value);
MXS_FREE(var_name);
MXS_FREE(var_value);
break;
case SESSION_TRACK_TRANSACTION_TYPE:
mxs_leint_consume(&ptr); // length
trx_info = mxs_lestr_consume_dup(&ptr);
MXS_INFO("get trx_info:%s", trx_info);
gwbuf_add_property(buff, (char *)"trx_state", trx_info);
MXS_FREE(trx_info);
break;
default:
ptr += mxs_leint_consume(&ptr);
MXS_WARNING("recieved unexpecting session track type:%d", type);
break;
}
}
}
}
}
}
mysql_tx_state_t parse_trx_state(char *str)
{
int s = TX_EMPTY;
assert(str);
do
{
switch (*str)
{
case 'T':
s |= TX_EXPLICIT;
break;
case 'I':
s |= TX_IMPLICIT;
break;
case 'r':
s |= TX_READ_UNSAFE;
break;
case 'R':
s |= TX_READ_TRX;
break;
case 'w':
s |= TX_WRITE_UNSAFE;
break;
case 'W':
s |= TX_WRITE_TRX;
break;
case 's':
s |= TX_STMT_UNSAFE;
break;
case 'S':
s |= TX_RESULT_SET;
break;
case 'L':
s |= TX_LOCKED_TABLES;
break;
default:
break;
}
} while(*(str++) != 0);
return (mysql_tx_state_t)s;
}

View File

@ -1169,7 +1169,7 @@ static void createListener(DCB *dcb, SERVICE *service, char *name, char *address
{
if (runtime_create_listener(service, name, address, port, protocol,
authenticator, authenticator_options,
key, cert, ca, version, depth))
key, cert, ca, version, depth, false))
{
dcb_printf(dcb, "Listener '%s' created\n", name);
}