refactor, check every packet before parser ok packet, move config to service, fix code style, ...

This commit is contained in:
Dapeng Huang
2018-01-15 20:25:44 +08:00
parent e1aeac8b07
commit d234b13027
17 changed files with 76 additions and 69 deletions

View File

@ -9,7 +9,7 @@ set(MARIADB_CONNECTOR_C_REPO "https://github.com/MariaDB/mariadb-connector-c.git
CACHE STRING "MariaDB Connector-C Git repository")
# Release 2.3.3 (preliminary) of the Connector-C
set(MARIADB_CONNECTOR_C_TAG "master"
set(MARIADB_CONNECTOR_C_TAG "v3.0.2"
CACHE STRING "MariaDB Connector-C Git tag")
ExternalProject_Add(connector-c

View File

@ -54,17 +54,16 @@ MXS_BEGIN_DECLS
#define MXS_JSON_PTR_RELATIONSHIPS_FILTERS "/data/relationships/filters/data"
/** Parameter value JSON Pointers */
#define MXS_JSON_PTR_PARAM_PORT MXS_JSON_PTR_PARAMETERS "/port"
#define MXS_JSON_PTR_PARAM_ADDRESS MXS_JSON_PTR_PARAMETERS "/address"
#define MXS_JSON_PTR_PARAM_PROTOCOL MXS_JSON_PTR_PARAMETERS "/protocol"
#define MXS_JSON_PTR_PARAM_AUTHENTICATOR MXS_JSON_PTR_PARAMETERS "/authenticator"
#define MXS_JSON_PTR_PARAM_AUTHENTICATOR_OPTIONS MXS_JSON_PTR_PARAMETERS "/authenticator_options"
#define MXS_JSON_PTR_PARAM_SSL_KEY MXS_JSON_PTR_PARAMETERS "/ssl_key"
#define MXS_JSON_PTR_PARAM_SSL_CERT MXS_JSON_PTR_PARAMETERS "/ssl_cert"
#define MXS_JSON_PTR_PARAM_SSL_CA_CERT MXS_JSON_PTR_PARAMETERS "/ssl_ca_cert"
#define MXS_JSON_PTR_PARAM_SSL_VERSION MXS_JSON_PTR_PARAMETERS "/ssl_version"
#define MXS_JSON_PTR_PARAM_SSL_CERT_VERIFY_DEPTH MXS_JSON_PTR_PARAMETERS "/ssl_cert_verify_depth"
#define MXS_JSON_PTR_PARAM_SESSION_TRACK_TRX_STATE MXS_JSON_PTR_PARAMETERS "/session_track_trx_state"
#define MXS_JSON_PTR_PARAM_PORT MXS_JSON_PTR_PARAMETERS "/port"
#define MXS_JSON_PTR_PARAM_ADDRESS MXS_JSON_PTR_PARAMETERS "/address"
#define MXS_JSON_PTR_PARAM_PROTOCOL MXS_JSON_PTR_PARAMETERS "/protocol"
#define MXS_JSON_PTR_PARAM_AUTHENTICATOR MXS_JSON_PTR_PARAMETERS "/authenticator"
#define MXS_JSON_PTR_PARAM_AUTHENTICATOR_OPTIONS MXS_JSON_PTR_PARAMETERS "/authenticator_options"
#define MXS_JSON_PTR_PARAM_SSL_KEY MXS_JSON_PTR_PARAMETERS "/ssl_key"
#define MXS_JSON_PTR_PARAM_SSL_CERT MXS_JSON_PTR_PARAMETERS "/ssl_cert"
#define MXS_JSON_PTR_PARAM_SSL_CA_CERT MXS_JSON_PTR_PARAMETERS "/ssl_ca_cert"
#define MXS_JSON_PTR_PARAM_SSL_VERSION MXS_JSON_PTR_PARAMETERS "/ssl_version"
#define MXS_JSON_PTR_PARAM_SSL_CERT_VERIFY_DEPTH MXS_JSON_PTR_PARAMETERS "/ssl_cert_verify_depth"
/** Non-parameter JSON pointers */
#define MXS_JSON_PTR_MODULE "/data/attributes/module"

View File

@ -199,6 +199,7 @@ typedef enum
} DCB_USAGE;
/* A few useful macros */
#define DCB_SERVICE(x, type) (type *)((x)->service)
#define DCB_SESSION(x) (x)->session
#define DCB_PROTOCOL(x, type) (type *)((x)->protocol)
#define DCB_WRITEQLEN(x) (x)->writeqlen

View File

@ -49,7 +49,6 @@ typedef struct servlistener
SPINLOCK lock;
int active; /**< True if the port has not been deleted */
struct servlistener *next; /**< Next service protocol */
bool session_track_trx_state; /** Get transation state from backend */
} SERV_LISTENER; // TODO: Rename to LISTENER
typedef struct listener_iterator
@ -80,7 +79,7 @@ json_t* listener_to_json(const SERV_LISTENER* listener);
SERV_LISTENER* listener_alloc(struct service* service, const char* name, const char *protocol,
const char *address, unsigned short port, const char *authenticator,
const char* auth_options, SSL_LISTENER *ssl, bool get_trx_state_from_backend);
const char* auth_options, SSL_LISTENER *ssl);
void listener_free(SERV_LISTENER* listener);
int listener_set_ssl_version(SSL_LISTENER *ssl_listener, char* version);
void listener_set_certificates(SSL_LISTENER *ssl_listener, char* cert, char* key, char* ca_cert);

View File

@ -353,7 +353,6 @@ typedef struct
int ignore_replies; /*< How many replies should be discarded */
GWBUF* stored_query; /*< Temporarily stored queries */
bool collect_result; /*< Collect the next result set as one buffer */
bool session_track_trx_state; /*< Get transation state from backend */
#if defined(SS_DEBUG)
skygw_chk_t protocol_chk_tail;
#endif
@ -494,7 +493,7 @@ void init_response_status(GWBUF* buf, uint8_t cmd, int* npackets, size_t* nbytes
bool read_complete_packet(DCB *dcb, GWBUF **readbuf);
bool gw_get_shared_session_auth_info(DCB* dcb, MYSQL_session* session);
void mxs_mysql_get_session_track_info(GWBUF *buff, uint32_t server_capabilities);
mysql_tx_state_t parse_trx_state(char *str);
mysql_tx_state_t parse_trx_state(const char *str);
/**
* Decode server handshake

View File

@ -158,6 +158,7 @@ typedef struct service
bool log_auth_warnings; /**< Log authentication failures and warnings */
uint64_t capabilities; /**< The capabilities of the service, @see enum routing_capability */
int max_retry_interval; /**< Maximum retry interval */
bool session_track_trx_state; /**< Get transaction state via session track mechanism */
} SERVICE;
typedef enum count_spec_t

View File

@ -147,7 +147,7 @@ const char CN_USER[] = "user";
const char CN_USERS[] = "users";
const char CN_VERSION_STRING[] = "version_string";
const char CN_WEIGHTBY[] = "weightby";
const char CN_SESSION_TRACK_TRX_STATE[] = "session_track_trx_state";
const char CN_SESSION_TRACK_TRX_STATE[] = "session_track_trx_state";
typedef struct duplicate_context
{
@ -234,7 +234,6 @@ const char *config_listener_params[] =
CN_SSL_VERSION,
CN_SSL_CERT_VERIFY_DEPTH,
CN_SSL_VERIFY_PEER_CERTIFICATE,
CN_SESSION_TRACK_TRX_STATE,
NULL
};
@ -2888,7 +2887,7 @@ int create_new_service(CONFIG_CONTEXT *obj)
serviceSetVersionString(service, gateway.version_string);
}
service->session_track_trx_state = config_get_bool(obj->parameters, CN_SESSION_TRACK_TRX_STATE);
/** Store the configuration parameters for the service */
const MXS_MODULE *mod = get_module(router, MODULE_ROUTER);
@ -3371,7 +3370,7 @@ int create_new_listener(CONFIG_CONTEXT *obj)
char *socket = config_get_value(obj->parameters, CN_SOCKET);
char *authenticator = config_get_value(obj->parameters, CN_AUTHENTICATOR);
char *authenticator_options = config_get_value(obj->parameters, CN_AUTHENTICATOR_OPTIONS);
bool session_track_trx_state = config_get_bool(obj->parameters, CN_SESSION_TRACK_TRX_STATE);
if (raw_service_name && protocol && (socket || port))
{
char service_name[strlen(raw_service_name) + 1];
@ -3393,7 +3392,7 @@ int create_new_listener(CONFIG_CONTEXT *obj)
else
{
serviceCreateListener(service, obj->object, protocol, socket, 0,
authenticator, authenticator_options, ssl_info, session_track_trx_state);
authenticator, authenticator_options, ssl_info);
}
}
@ -3410,7 +3409,7 @@ int create_new_listener(CONFIG_CONTEXT *obj)
else
{
serviceCreateListener(service, obj->object, protocol, address, atoi(port),
authenticator, authenticator_options, ssl_info, session_track_trx_state);
authenticator, authenticator_options, ssl_info);
}
}

View File

@ -795,7 +795,7 @@ bool runtime_create_listener(SERVICE *service, const char *name, const char *add
const char *port, const char *proto, const char *auth,
const char *auth_opt, const char *ssl_key,
const char *ssl_cert, const char *ssl_ca,
const char *ssl_version, const char *ssl_depth, bool session_track_trx_state)
const char *ssl_version, const char *ssl_depth)
{
if (addr == NULL || strcasecmp(addr, CN_DEFAULT) == 0)
@ -842,7 +842,7 @@ bool runtime_create_listener(SERVICE *service, const char *name, const char *add
{
const char *print_addr = addr ? addr : "::";
SERV_LISTENER *listener = serviceCreateListener(service, name, proto, addr,
u_port, auth, auth_opt, ssl, session_track_trx_state);
u_port, auth, auth_opt, ssl);
if (listener && listener_serialize(listener))
{
@ -1961,12 +1961,11 @@ bool runtime_create_listener_from_json(SERVICE* service, json_t* json)
const char* ssl_ca_cert = get_string_or_null(json, MXS_JSON_PTR_PARAM_SSL_CA_CERT);
const char* ssl_version = get_string_or_null(json, MXS_JSON_PTR_PARAM_SSL_VERSION);
const char* ssl_cert_verify_depth = get_string_or_null(json, MXS_JSON_PTR_PARAM_SSL_CERT_VERIFY_DEPTH);
bool session_track_trx_state = is_bool_or_null(json, MXS_JSON_PTR_PARAM_SESSION_TRACK_TRX_STATE);
rval = runtime_create_listener(service, id, address, port.c_str(), protocol,
authenticator, authenticator_options,
ssl_key, ssl_cert, ssl_ca_cert, ssl_version,
ssl_cert_verify_depth, session_track_trx_state);
ssl_cert_verify_depth);
}
return rval;

View File

@ -166,8 +166,7 @@ bool runtime_create_listener(SERVICE *service, const char *name, const char *add
const char *port, const char *proto, const char *auth,
const char *auth_opt, const char *ssl_key,
const char *ssl_cert, const char *ssl_ca,
const char *ssl_version, const char *ssl_depth,
bool session_track_trx_state);
const char *ssl_version, const char *ssl_depth);
/**
* @brief Destroy a listener

View File

@ -84,7 +84,7 @@ bool service_thread_init();
SERV_LISTENER* serviceCreateListener(SERVICE *service, const char *name,
const char *protocol, const char *address,
unsigned short port, const char *authenticator,
const char *options, SSL_LISTENER *ssl, bool session_track_trx_state);
const char *options, SSL_LISTENER *ssl);
void serviceRemoveBackend(SERVICE *service, const SERVER *server);

View File

@ -62,7 +62,7 @@ static RSA *tmp_rsa_callback(SSL *s, int is_export, int keylength);
SERV_LISTENER *
listener_alloc(struct service* service, const char* name, const char *protocol,
const char *address, unsigned short port, const char *authenticator,
const char* auth_options, SSL_LISTENER *ssl, bool session_track_trx_state)
const char* auth_options, SSL_LISTENER *ssl)
{
char *my_address = NULL;
if (address)
@ -135,7 +135,6 @@ listener_alloc(struct service* service, const char* name, const char *protocol,
proto->users = NULL;
proto->next = NULL;
proto->auth_instance = auth_instance;
proto->session_track_trx_state = session_track_trx_state;
spinlock_init(&proto->lock);
return proto;

View File

@ -755,10 +755,10 @@ static void service_add_listener(SERVICE* service, SERV_LISTENER* proto)
*/
SERV_LISTENER* serviceCreateListener(SERVICE *service, const char *name, const char *protocol,
const char *address, unsigned short port, const char *authenticator,
const char *options, SSL_LISTENER *ssl, bool session_track_trx_state)
const char *options, SSL_LISTENER *ssl)
{
SERV_LISTENER *proto = listener_alloc(service, name, protocol, address,
port, authenticator, options, ssl, session_track_trx_state);
port, authenticator, options, ssl);
if (proto)
{

View File

@ -71,7 +71,7 @@ test1()
ss_dfprintf(stderr, "\t..done\nAdding protocol testprotocol.");
set_libdir(MXS_STRDUP_A("../../modules/authenticator/MySQLAuth/"));
ss_info_dassert(serviceCreateListener(service, "TestProtocol", "testprotocol",
"localhost", 9876, "MySQLAuth", NULL, NULL, false),
"localhost", 9876, "MySQLAuth", NULL, NULL),
"Add Protocol should succeed");
ss_info_dassert(0 != serviceHasListener(service, "TestProtocol", "testprotocol", "localhost", 9876),
"Service should have new protocol as requested");

View File

@ -755,6 +755,9 @@ gw_read_and_write(DCB *dcb)
bool result_collected = false;
MySQLProtocol *proto = (MySQLProtocol *)dcb->protocol;
/** Get sesion track info from ok packet and save it to gwbuf properties */
mxs_mysql_get_session_track_info(read_buffer, proto->server_capabilities);
if (rcap_type_required(capabilities, RCAP_TYPE_PACKET_OUTPUT) ||
rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_OUTPUT) ||
proto->ignore_replies != 0)
@ -977,8 +980,6 @@ gw_read_and_write(DCB *dcb)
gwbuf_set_type(stmt, GWBUF_TYPE_RESULT);
}
mxs_mysql_get_session_track_info(stmt, proto->server_capabilities);
session->service->router->clientReply(session->service->router_instance,
session->router_session,
stmt, dcb);

View File

@ -412,7 +412,8 @@ int MySQLSendHandshake(DCB* dcb)
int gw_MySQLWrite_client(DCB *dcb, GWBUF *queue)
{
MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
if (GWBUF_IS_REPLY_OK(queue) && protocol->session_track_trx_state)
SERVICE *service = DCB_SERVICE(dcb, SERVICE);
if (GWBUF_IS_REPLY_OK(queue) && service->session_track_trx_state)
{
parse_and_set_trx_state(dcb->session, queue);
}
@ -1271,8 +1272,6 @@ int gw_MySQLListener(DCB *listen_dcb, char *config_bind)
int gw_MySQLAccept(DCB *listener)
{
DCB *client_dcb;
MySQLProtocol *cli_proto;
SERV_LISTENER *listener_proto;
CHK_DCB(listener);
@ -1285,9 +1284,6 @@ int gw_MySQLAccept(DCB *listener)
while ((client_dcb = dcb_accept(listener)) != NULL)
{
gw_process_one_new_client(client_dcb);
cli_proto = DCB_PROTOCOL(client_dcb, MySQLProtocol);
listener_proto = (SERV_LISTENER *)listener->listener;
cli_proto->session_track_trx_state = listener_proto->session_track_trx_state;
} /**< while client_dcb != NULL */
}
@ -1519,8 +1515,8 @@ static int route_by_statement(MXS_SESSION* session, uint64_t capabilities, GWBUF
}
}
MySQLProtocol *protocol = DCB_PROTOCOL(session->client_dcb, MySQLProtocol);
if (rcap_type_required(capabilities, RCAP_TYPE_TRANSACTION_TRACKING) && !protocol->session_track_trx_state)
SERVICE *service = DCB_SERVICE(session->client_dcb, SERVICE);
if (rcap_type_required(capabilities, RCAP_TYPE_TRANSACTION_TRACKING) && service->session_track_trx_state)
{
if (session_trx_is_ending(session))
{
@ -1900,9 +1896,23 @@ static bool parse_kill_query(char *query, uint64_t *thread_id_out, kill_type_t *
}
}
/*
* Mapping two session tracker's info to mxs_session_trx_state_t
* SESSION_TRACK_STATE_CHANGE:
* When session variable autocommit is changed, it will give a latest value {0|1}
* SESSION_TRACK_TRANSACTION_TYPE:
* TX_EMPTY => SESSION_TRX_INACTIVE
* TX_WRITE_TRX => SESSION_TRX_READ_WRITE
* TX_READ_TRX => SESSION_TRX_READ_ONLY
* TX_EXPLICIT | TX_IMPLICIT => SESSION_TRX_ACTIVE
* refs:
* 1. https://dev.mysql.com/worklog/task/?id=6885
* 2. https://dev.mysql.com/worklog/task/?id=6631
*/
static void parse_and_set_trx_state(MXS_SESSION *ses, GWBUF *data)
{
char *autocommit = gwbuf_get_property(data, (char *)"autocommit");
if (autocommit)
{
if (strncasecmp(autocommit, "0", 1) == 0)
@ -1924,17 +1934,17 @@ static void parse_and_set_trx_state(MXS_SESSION *ses, GWBUF *data)
{
session_set_trx_state(ses, SESSION_TRX_INACTIVE);
}
else if(s & TX_READ_TRX)
else if (s & TX_READ_TRX)
{
session_set_trx_state(ses, SESSION_TRX_READ_ONLY);
}
else if(s & TX_WRITE_TRX)
else if (s & TX_WRITE_TRX)
{
session_set_trx_state(ses, SESSION_TRX_READ_WRITE);
}
else if((s & TX_EXPLICIT) | (s & TX_IMPLICIT))
else if ((s & TX_EXPLICIT) || (s & TX_IMPLICIT))
{
session_set_trx_state(ses, SESSION_TRX_ACTIVE);
}
}
}
}

View File

@ -1778,33 +1778,28 @@ void mxs_mysql_execute_kill_user(MXS_SESSION* issuer, const char* user, kill_typ
void mxs_mysql_get_session_track_info(GWBUF *buff, uint32_t server_capabilities)
{
char *trx_info, *var_name, *var_value;
if (GWBUF_LENGTH(buff) < 9 || !mxs_mysql_is_ok_packet(buff))
size_t len = GWBUF_LENGTH(buff);
uint8_t local_buf[len];
uint8_t *ptr = local_buf;
if (len < MYSQL_OK_PACKET_MIN_LEN || !mxs_mysql_is_ok_packet(buff))
{
return;
}
if (!GWBUF_IS_CONTIGUOUS(buff))
{
buff = gwbuf_make_contiguous(buff);
}
gwbuf_copy_data(buff, 0, len, local_buf);
buff->gwbuf_type |= GWBUF_TYPE_REPLY_OK;
uint8_t* ptr = GWBUF_DATA(buff);
ptr += (MYSQL_COM_OFFSET + 1); // Header and Command type
mxs_leint_consume(&ptr); // Affected rows
mxs_leint_consume(&ptr); // Last insert-id
ptr += (MYSQL_HEADER_LEN + 1); // Header and Command type
mxs_leint_consume(&ptr); // Affected rows
mxs_leint_consume(&ptr); // Last insert-id
uint16_t server_status = gw_mysql_get_byte2(ptr);
ptr += 2; // status
ptr += 2; // number of warnings
if (server_capabilities & GW_MYSQL_CAPABILITIES_SESSION_TRACK)
{
if (ptr < buff->end) {
if (ptr < buff->end)
{
ptr += mxs_leint_consume(&ptr); // info
if (server_status & SERVER_SESSION_STATE_CHANGED)
{
@ -1832,7 +1827,7 @@ void mxs_mysql_get_session_track_info(GWBUF *buff, uint32_t server_capabilities)
case SESSION_TRACK_TRANSACTION_TYPE:
mxs_leint_consume(&ptr); // length
trx_info = mxs_lestr_consume_dup(&ptr);
MXS_INFO("get trx_info:%s", trx_info);
MXS_DEBUG("get trx_info:%s", trx_info);
gwbuf_add_property(buff, (char *)"trx_state", trx_info);
MXS_FREE(trx_info);
break;
@ -1847,10 +1842,16 @@ void mxs_mysql_get_session_track_info(GWBUF *buff, uint32_t server_capabilities)
}
}
mysql_tx_state_t parse_trx_state(char *str)
/***
* As described in https://dev.mysql.com/worklog/task/?id=6631
* When session transation state changed
* SESSION_TRACK_TRANSACTION_TYPE (or SESSION_TRACK_TRANSACTION_STATE in MySQL) will
* return an 8 bytes string to indicate the transaction state details
* */
mysql_tx_state_t parse_trx_state(const char *str)
{
int s = TX_EMPTY;
assert(str);
ss_dassert(str);
do
{
switch (*str)
@ -1889,4 +1890,4 @@ mysql_tx_state_t parse_trx_state(char *str)
} while(*(str++) != 0);
return (mysql_tx_state_t)s;
}
}

View File

@ -1169,7 +1169,7 @@ static void createListener(DCB *dcb, SERVICE *service, char *name, char *address
{
if (runtime_create_listener(service, name, address, port, protocol,
authenticator, authenticator_options,
key, cert, ca, version, depth, false))
key, cert, ca, version, depth))
{
dcb_printf(dcb, "Listener '%s' created\n", name);
}