Merge pull request #6 from skysql/MAX-99

Max 99
This commit is contained in:
Vilho Raatikka 2014-06-25 18:06:22 +03:00
commit 22827e7c8d
16 changed files with 1270 additions and 484 deletions

View File

@ -101,7 +101,8 @@ static int is_autocommit_stmt(
*/
skygw_query_type_t skygw_query_classifier_get_type(
const char* query,
unsigned long client_flags)
unsigned long client_flags,
MYSQL** p_mysql)
{
MYSQL* mysql;
char* query_str;
@ -129,9 +130,13 @@ skygw_query_type_t skygw_query_classifier_get_type(
mysql_error(mysql))));
mysql_library_end();
goto return_without_server;
goto return_qtype;
}
if (p_mysql != NULL)
{
*p_mysql = mysql;
}
/** Set methods and authentication to mysql */
mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "libmysqld_skygw");
mysql_options(mysql, MYSQL_OPT_USE_EMBEDDED_CONNECTION, NULL);
@ -143,28 +148,42 @@ skygw_query_type_t skygw_query_classifier_get_type(
/** Get one or create new THD object to be use in parsing */
thd = get_or_create_thd_for_parsing(mysql, query_str);
if (thd == NULL) {
goto return_with_server_handle;
if (thd == NULL)
{
skygw_query_classifier_free(mysql);
}
/** Create parse_tree inside thd */
failp = create_parse_tree(thd);
if (failp) {
goto return_with_thd;
if (failp)
{
skygw_query_classifier_free(mysql);
*p_mysql = NULL;
}
qtype = resolve_query_type(thd);
return_with_thd:
(*mysql->methods->free_embedded_thd)(mysql);
mysql->thd = 0;
return_with_server_handle:
mysql_close(mysql);
mysql_thread_end();
return_without_server:
if (p_mysql == NULL)
{
skygw_query_classifier_free(mysql);
}
return_qtype:
return qtype;
}
void skygw_query_classifier_free(
MYSQL* mysql)
{
if (mysql->thd != NULL)
{
(*mysql->methods->free_embedded_thd)(mysql);
mysql->thd = NULL;
}
mysql_close(mysql);
mysql_thread_end();
}
/**
* @node (write brief function description here)
@ -492,6 +511,7 @@ static skygw_query_type_t resolve_query_type(
}
/**<! fall through */
case SQLCOM_CHANGE_DB:
case SQLCOM_DEALLOCATE_PREPARE:
type |= QUERY_TYPE_SESSION_WRITE;
break;
@ -518,6 +538,11 @@ static skygw_query_type_t resolve_query_type(
goto return_qtype;
break;
case SQLCOM_PREPARE:
type |= QUERY_TYPE_PREPARE_NAMED_STMT;
goto return_qtype;
break;
default:
break;
}
@ -783,3 +808,11 @@ static int is_autocommit_stmt(
return_rc:
return rc;
}
char* skygw_query_classifier_get_stmtname(
MYSQL* mysql)
{
return ((THD *)(mysql->thd))->lex->prepared_stmt_name.str;
}

View File

@ -19,6 +19,7 @@ Copyright SkySQL Ab
/** getpid */
#include <unistd.h>
#include <mysql.h>
#include "../utils/skygw_utils.h"
EXTERN_C_BLOCK_BEGIN
@ -29,25 +30,36 @@ EXTERN_C_BLOCK_BEGIN
* is modified
*/
typedef enum {
QUERY_TYPE_UNKNOWN = 0x000, /*< Initial value, can't be tested bitwisely */
QUERY_TYPE_LOCAL_READ = 0x001, /*< Read non-database data, execute in MaxScale */
QUERY_TYPE_READ = 0x002, /*< No updates */
QUERY_TYPE_WRITE = 0x004, /*< Master data will be modified */
QUERY_TYPE_SESSION_WRITE = 0x008, /*< Session data will be modified */
QUERY_TYPE_GLOBAL_WRITE = 0x010, /*< Global system variable modification */
QUERY_TYPE_BEGIN_TRX = 0x020, /*< BEGIN or START TRANSACTION */
QUERY_TYPE_ENABLE_AUTOCOMMIT = 0x040,/*< SET autocommit=1 */
QUERY_TYPE_DISABLE_AUTOCOMMIT = 0x080,/*< SET autocommit=0 */
QUERY_TYPE_ROLLBACK = 0x100, /*< ROLLBACK */
QUERY_TYPE_COMMIT = 0x200 /*< COMMIT */
QUERY_TYPE_UNKNOWN = 0x0000, /*< Initial value, can't be tested bitwisely */
QUERY_TYPE_LOCAL_READ = 0x0001, /*< Read non-database data, execute in MaxScale */
QUERY_TYPE_READ = 0x0002, /*< No updates */
QUERY_TYPE_WRITE = 0x0004, /*< Master data will be modified */
QUERY_TYPE_SESSION_WRITE = 0x0008, /*< Session data will be modified */
QUERY_TYPE_GLOBAL_WRITE = 0x0010, /*< Global system variable modification */
QUERY_TYPE_BEGIN_TRX = 0x0020, /*< BEGIN or START TRANSACTION */
QUERY_TYPE_ENABLE_AUTOCOMMIT = 0x0040, /*< SET autocommit=1 */
QUERY_TYPE_DISABLE_AUTOCOMMIT = 0x0080, /*< SET autocommit=0 */
QUERY_TYPE_ROLLBACK = 0x0100, /*< ROLLBACK */
QUERY_TYPE_COMMIT = 0x0200, /*< COMMIT */
QUERY_TYPE_PREPARE_NAMED_STMT = 0x0400, /*< Prepared stmt with name from user */
QUERY_TYPE_PREPARE_STMT = 0x0800, /*< Prepared stmt with id provided by server */
QUERY_TYPE_EXEC_STMT = 0x1000 /*< Execute prepared statement */
} skygw_query_type_t;
#define QUERY_IS_TYPE(mask,type) ((mask & type) == type)
/**
* Create THD and use it for creating parse tree. Examine parse tree and
* classify the query.
*/
skygw_query_type_t skygw_query_classifier_get_type(
const char* query_str,
unsigned long client_flags);
unsigned long client_flags,
MYSQL** mysql);
/** Free THD context and close MYSQL */
void skygw_query_classifier_free(MYSQL* mysql);
char* skygw_query_classifier_get_stmtname(MYSQL* mysql);
EXTERN_C_BLOCK_END

View File

@ -186,30 +186,28 @@ GWBUF *gwbuf_clone_transform(
goto return_clonebuf;
}
switch (src_type)
if (GWBUF_IS_TYPE_MYSQL(head))
{
case GWBUF_TYPE_MYSQL:
if (targettype == GWBUF_TYPE_PLAINSQL)
{
/** Crete reference to string part of buffer */
clonebuf = gwbuf_clone_portion(
head,
5,
GWBUF_LENGTH(head)-5);
ss_dassert(clonebuf != NULL);
/** Overwrite the type with new format */
clonebuf->gwbuf_type = targettype;
}
else
{
clonebuf = NULL;
}
break;
default:
if (GWBUF_TYPE_PLAINSQL == targettype)
{
/** Crete reference to string part of buffer */
clonebuf = gwbuf_clone_portion(
head,
5,
GWBUF_LENGTH(head)-5);
ss_dassert(clonebuf != NULL);
/** Overwrite the type with new format */
gwbuf_set_type(clonebuf, targettype);
}
else
{
clonebuf = NULL;
break;
} /*< switch (src_type) */
}
}
else
{
clonebuf = NULL;
}
return_clonebuf:
return clonebuf;
@ -329,6 +327,7 @@ bool gwbuf_set_type(
case GWBUF_TYPE_MYSQL:
case GWBUF_TYPE_PLAINSQL:
case GWBUF_TYPE_UNDEFINED:
case GWBUF_TYPE_SINGLE_STMT: /*< buffer contains one stmt */
buf->gwbuf_type |= type;
succp = true;
break;

View File

@ -301,8 +301,9 @@ dcb_final_free(DCB *dcb)
DCB_CALLBACK *cb;
CHK_DCB(dcb);
ss_info_dassert(dcb->state == DCB_STATE_DISCONNECTED,
"dcb not in DCB_STATE_DISCONNECTED state.");
ss_info_dassert(dcb->state == DCB_STATE_DISCONNECTED ||
dcb->state == DCB_STATE_ALLOC,
"dcb not in DCB_STATE_DISCONNECTED not in DCB_STATE_ALLOC state.");
/*< First remove this DCB from the chain */
spinlock_acquire(&dcbspin);
@ -701,6 +702,11 @@ int dcb_read(
n = 0;
goto return_n;
}
else if (b == 0)
{
n = 0;
goto return_n;
}
bufsize = MIN(b, MAX_BUFFER_SIZE);
if ((buffer = gwbuf_alloc(bufsize)) == NULL)

View File

@ -116,7 +116,8 @@ MODULE_INFO *mod_info = NULL;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to load library for module: "
"%s\n\t\t\t %s.",
"%s\n\n\t\t %s."
"\n\n",
module,
dlerror())));
return NULL;

View File

@ -127,7 +127,8 @@ session_alloc(SERVICE *service, DCB *client_dcb)
* session, therefore it is important that the session lock is
* relinquished beforethe router call.
*/
if (client_dcb->state != DCB_STATE_LISTENING && client_dcb->dcb_role != DCB_ROLE_INTERNAL)
if (client_dcb->state != DCB_STATE_LISTENING &&
client_dcb->dcb_role != DCB_ROLE_INTERNAL)
{
session->router_session =
service->router->newSession(service->router_instance,
@ -196,14 +197,28 @@ session_alloc(SERVICE *service, DCB *client_dcb)
}
spinlock_acquire(&session_spin);
session->state = SESSION_STATE_ROUTER_READY;
session->next = allSessions;
allSessions = session;
spinlock_release(&session_spin);
atomic_add(&service->stats.n_sessions, 1);
atomic_add(&service->stats.n_current, 1);
CHK_SESSION(session);
if (session->state != SESSION_STATE_READY)
{
session_free(session);
client_dcb->session = NULL;
session = NULL;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Failed to create %s session.",
service->name)));
spinlock_release(&session_spin);
}
else
{
session->state = SESSION_STATE_ROUTER_READY;
session->next = allSessions;
allSessions = session;
spinlock_release(&session_spin);
atomic_add(&service->stats.n_sessions, 1);
atomic_add(&service->stats.n_current, 1);
CHK_SESSION(session);
}
return_session:
return session;
}
@ -310,9 +325,6 @@ bool session_free(
/* Free router_session and session */
if (session->router_session) {
session->service->router->closeSession(
session->service->router_instance,
session->router_session);
session->service->router->freeSession(
session->service->router_instance,
session->router_session);

View File

@ -46,13 +46,16 @@
typedef enum
{
GWBUF_TYPE_UNDEFINED = 0x00,
GWBUF_TYPE_PLAINSQL = 0x01,
GWBUF_TYPE_MYSQL = 0x02
GWBUF_TYPE_UNDEFINED = 0x00,
GWBUF_TYPE_PLAINSQL = 0x01,
GWBUF_TYPE_MYSQL = 0x02,
GWBUF_TYPE_SINGLE_STMT = 0x04
} gwbuf_type_t;
#define GWBUF_IS_TYPE_PLAINSQL(b) (b->gwbuf_type & GWBUF_TYPE_PLAINSQL)
#define GWBUF_IS_TYPE_MYSQL(b) (b->gwbuf_type & GWBUF_TYPE_MYSQL)
#define GWBUF_IS_TYPE_UNDEFINED(b) (b->gwbuf_type == 0)
#define GWBUF_IS_TYPE_PLAINSQL(b) (b->gwbuf_type & GWBUF_TYPE_PLAINSQL)
#define GWBUF_IS_TYPE_MYSQL(b) (b->gwbuf_type & GWBUF_TYPE_MYSQL)
#define GWBUF_IS_TYPE_SINGLE_STMT(b) (b->gwbuf_type & GWBUF_TYPE_SINGLE_STMT)
/**
* A structure to encapsulate the data in a form that the data itself can be

View File

@ -229,7 +229,6 @@ typedef struct dcb {
struct service *service; /**< The related service */
void *data; /**< Specific client data */
DCBMM memdata; /**< The data related to DCB memory management */
int command; /**< Specific client command type */
SPINLOCK cb_lock; /**< The lock for the callbacks linked list */
DCB_CALLBACK *callbacks; /**< The list of callbacks for the DCB */

View File

@ -77,7 +77,7 @@
#define GW_SCRAMBLE_LENGTH_323 8
#ifndef MYSQL_SCRAMBLE_LEN
#define MYSQL_SCRAMBLE_LEN GW_MYSQL_SCRAMBLE_SIZE
# define MYSQL_SCRAMBLE_LEN GW_MYSQL_SCRAMBLE_SIZE
#endif
#define GW_NOINTR_CALL(A) do { errno = 0; A; } while (errno == EINTR)
@ -92,41 +92,15 @@
struct dcb;
typedef enum {
MYSQL_ALLOC,
MYSQL_PENDING_CONNECT,
MYSQL_CONNECTED,
MYSQL_AUTH_SENT,
MYSQL_AUTH_RECV,
MYSQL_AUTH_FAILED,
MYSQL_IDLE,
MYSQL_ROUTING,
MYSQL_WAITING_RESULT,
MYSQL_SESSION_CHANGE
} mysql_pstate_t;
MYSQL_ALLOC,
MYSQL_PENDING_CONNECT,
MYSQL_CONNECTED,
MYSQL_AUTH_SENT,
MYSQL_AUTH_RECV,
MYSQL_AUTH_FAILED,
MYSQL_IDLE
} mysql_auth_state_t;
/*
* MySQL Protocol specific state data
*/
typedef struct {
#if defined(SS_DEBUG)
skygw_chk_t protocol_chk_top;
#endif
int fd; /*< The socket descriptor */
struct dcb *owner_dcb; /*< The DCB of the socket
* we are running on */
mysql_pstate_t state; /*< Current protocol state */
uint8_t scramble[MYSQL_SCRAMBLE_LEN]; /*< server scramble,
* created or received */
uint32_t server_capabilities; /*< server capabilities,
* created or received */
uint32_t client_capabilities; /*< client capabilities,
* created or received */
unsigned long tid; /*< MySQL Thread ID, in
* handshake */
#if defined(SS_DEBUG)
skygw_chk_t protocol_chk_tail;
#endif
} MySQLProtocol;
/*
* MySQL session specific data
@ -139,7 +113,6 @@ typedef struct mysql_session {
} MYSQL_session;
/** Protocol packing macros. */
#define gw_mysql_set_byte2(__buffer, __int) do { \
(__buffer)[0]= (uint8_t)((__int) & 0xFF); \
@ -230,18 +203,90 @@ typedef enum
),
} gw_mysql_capabilities_t;
/** Basic mysql commands */
#define MYSQL_COM_CHANGE_USER 0x11
#define MYSQL_COM_QUIT 0x1
#define MYSQL_COM_INIT_DB 0x2
#define MYSQL_COM_QUERY 0x3
/** Copy from enum in mariadb-5.5 mysql_com.h */
typedef enum mysql_server_cmd {
MYSQL_COM_UNDEFINED = -1,
MYSQL_COM_SLEEP = 0,
MYSQL_COM_QUIT,
MYSQL_COM_INIT_DB,
MYSQL_COM_QUERY,
MYSQL_COM_FIELD_LIST,
MYSQL_COM_CREATE_DB,
MYSQL_COM_DROP_DB,
MYSQL_COM_REFRESH,
MYSQL_COM_SHUTDOWN,
MYSQL_COM_STATISTICS,
MYSQL_COM_PROCESS_INFO,
MYSQL_COM_CONNECT,
MYSQL_COM_PROCESS_KILL,
MYSQL_COM_DEBUG,
MYSQL_COM_PING,
MYSQL_COM_TIME,
MYSQL_COM_DELAYED_INSERT,
MYSQL_COM_CHANGE_USER,
MYSQL_COM_BINLOG_DUMP,
MYSQL_COM_TABLE_DUMP,
MYSQL_COM_CONNECT_OUT,
MYSQL_COM_REGISTER_SLAVE,
MYSQL_COM_STMT_PREPARE,
MYSQL_COM_STMT_EXECUTE,
MYSQL_COM_STMT_SEND_LONG_DATA,
MYSQL_COM_STMT_CLOSE,
MYSQL_COM_STMT_RESET,
MYSQL_COM_SET_OPTION,
MYSQL_COM_STMT_FETCH,
MYSQL_COM_DAEMON
} mysql_server_cmd_t;
#define MYSQL_GET_COMMAND(payload) (payload[4])
#define MYSQL_GET_PACKET_NO(payload) (payload[3])
#define MYSQL_GET_PACKET_LEN(payload) (gw_mysql_get_byte3(payload))
#define MYSQL_GET_ERRCODE(payload) (gw_mysql_get_byte2(&payload[5]))
/**
* List of server commands, and number of response packets are stored here.
* server_command_t is used in MySQLProtocol structure, so for each DCB there is
* one MySQLProtocol and one server command list.
*/
typedef struct server_command_st {
mysql_server_cmd_t cmd;
int nresponse_packets; /** filled when reply arrives */
struct server_command_st* next;
} server_command_t;
/*
* MySQL Protocol specific state data
*/
typedef struct {
#if defined(SS_DEBUG)
skygw_chk_t protocol_chk_top;
#endif
int fd; /*< The socket descriptor */
struct dcb *owner_dcb; /*< The DCB of the socket
* we are running on */
SPINLOCK protocol_lock;
server_command_t protocol_command; /*< list of active commands */
mysql_auth_state_t protocol_auth_state; /*< Authentication status */
uint8_t scramble[MYSQL_SCRAMBLE_LEN]; /*< server scramble,
* created or received */
uint32_t server_capabilities; /*< server capabilities,
* created or received */
uint32_t client_capabilities; /*< client capabilities,
* created or received */
unsigned long tid; /*< MySQL Thread ID, in
* handshake */
#if defined(SS_DEBUG)
skygw_chk_t protocol_chk_tail;
#endif
} MySQLProtocol;
#define MYSQL_GET_COMMAND(payload) (payload[4])
#define MYSQL_GET_PACKET_NO(payload) (payload[3])
#define MYSQL_GET_PACKET_LEN(payload) (gw_mysql_get_byte3(payload))
#define MYSQL_GET_ERRCODE(payload) (gw_mysql_get_byte2(&payload[5]))
#define MYSQL_GET_STMTOK_NPARAM(payload) (gw_mysql_get_byte2(&payload[9]))
#define MYSQL_GET_STMTOK_NATTR(payload) (gw_mysql_get_byte2(&payload[11]))
#define MYSQL_IS_ERROR_PACKET(payload) (MYSQL_GET_COMMAND(payload)==0xff)
#endif /** _MYSQL_PROTOCOL_H */
void gw_mysql_close(MySQLProtocol **ptr);
MySQLProtocol* mysql_protocol_init(DCB* dcb, int fd);
@ -314,4 +359,14 @@ char *gw_strend(register const char *s);
int setnonblocking(int fd);
int setipaddress(struct in_addr *a, char *p);
GWBUF* gw_MySQL_get_next_packet(GWBUF** p_readbuf);
GWBUF* gw_MySQL_get_packets(GWBUF** p_readbuf, int* npackets);
GWBUF* gw_MySQL_discard_packets(GWBUF* buf, int npackets);
void protocol_add_srv_command(MySQLProtocol* p, mysql_server_cmd_t cmd);
void protocol_remove_srv_command(MySQLProtocol* p);
bool protocol_waits_response(MySQLProtocol* p);
mysql_server_cmd_t protocol_get_srv_command(MySQLProtocol* p,bool removep);
int get_stmt_nresponse_packets(GWBUF* buf, mysql_server_cmd_t cmd);
int protocol_get_nresponse_packets (MySQLProtocol* p);
bool protocol_set_nresponse_packets (MySQLProtocol* p, int nresponse_packets);

View File

@ -30,18 +30,36 @@
*/
#include <dcb.h>
#include <hashtable.h>
#undef PREP_STMT_CACHING
#if defined(PREP_STMT_CACHING)
typedef enum prep_stmt_type {
PREP_STMT_NAME,
PREP_STMT_ID
} prep_stmt_type_t;
typedef enum prep_stmt_state {
PREP_STMT_ALLOC,
PREP_STMT_SENT,
PREP_STMT_RECV,
PREP_STMT_DROPPED
} prep_stmt_state_t;
#endif /*< PREP_STMT_CACHING */
typedef enum bref_state {
BREF_NOT_USED = 0x00,
BREF_IN_USE = 0x01,
BREF_WAITING_RESULT = 0x02, /*< for anything that responds */
BREF_CLOSED = 0x04
BREF_IN_USE = 0x01,
BREF_WAITING_RESULT = 0x02, /*< for anything that responds */
BREF_CLOSED = 0x04
} bref_state_t;
#define BREF_IS_NOT_USED(s) (s->bref_state & BREF_NOT_USED)
#define BREF_IS_IN_USE(s) (s->bref_state & BREF_IN_USE)
#define BREF_IS_WAITING_RESULT(s) (s->bref_state & BREF_WAITING_RESULT)
#define BREF_IS_CLOSED(s) (s->bref_state & BREF_CLOSED)
#define BREF_IS_NOT_USED(s) (s->bref_state & ~BREF_IN_USE)
#define BREF_IS_IN_USE(s) (s->bref_state & BREF_IN_USE)
#define BREF_IS_WAITING_RESULT(s) (s->bref_state & BREF_WAITING_RESULT)
#define BREF_IS_CLOSED(s) (s->bref_state & BREF_CLOSED)
typedef enum backend_type_t {
BE_UNDEFINED=-1,
@ -186,6 +204,25 @@ typedef struct rwsplit_config_st {
} rwsplit_config_t;
#if defined(PREP_STMT_CACHING)
typedef struct prep_stmt_st {
#if defined(SS_DEBUG)
skygw_chk_t pstmt_chk_top;
#endif
union id {
int seq;
char* name;
} pstmt_id;
prep_stmt_state_t pstmt_state;
prep_stmt_type_t pstmt_type;
#if defined(SS_DEBUG)
skygw_chk_t pstmt_chk_tail;
#endif
} prep_stmt_t;
#endif /*< PREP_STMT_CACHING */
/**
* The client session structure used within this router.
*/
@ -205,7 +242,9 @@ struct router_client_session {
int rses_capabilities; /*< input type, for example */
bool rses_autocommit_enabled;
bool rses_transaction_active;
uint64_t rses_id; /*< ID for router client session */
#if defined(PREP_STMT_CACHING)
HASHTABLE* rses_prep_stmt[2];
#endif
struct router_client_session* next;
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail;

View File

@ -163,7 +163,7 @@ static int gw_read_backend_event(DCB *dcb) {
backend_protocol = (MySQLProtocol *) dcb->protocol;
CHK_PROTOCOL(backend_protocol);
#if 1
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] Read dcb %p fd %d protocol "
@ -171,8 +171,9 @@ static int gw_read_backend_event(DCB *dcb) {
pthread_self(),
dcb,
dcb->fd,
backend_protocol->state,
STRPROTOCOLSTATE(backend_protocol->state))));
backend_protocol->protocol_auth_state,
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
#endif
/* backend is connected:
@ -186,17 +187,18 @@ static int gw_read_backend_event(DCB *dcb) {
* If starting to auhenticate with backend server, lock dcb
* to prevent overlapping processing of auth messages.
*/
if (backend_protocol->state == MYSQL_CONNECTED) {
if (backend_protocol->protocol_auth_state == MYSQL_CONNECTED)
{
spinlock_acquire(&dcb->authlock);
backend_protocol = (MySQLProtocol *) dcb->protocol;
CHK_PROTOCOL(backend_protocol);
if (backend_protocol->state == MYSQL_CONNECTED) {
if (gw_read_backend_handshake(backend_protocol) != 0) {
backend_protocol->state = MYSQL_AUTH_FAILED;
if (backend_protocol->protocol_auth_state == MYSQL_CONNECTED)
{
if (gw_read_backend_handshake(backend_protocol) != 0)
{
backend_protocol->protocol_auth_state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] after "
@ -205,7 +207,9 @@ static int gw_read_backend_event(DCB *dcb) {
pthread_self(),
backend_protocol->owner_dcb->fd)));
} else {
}
else
{
/* handshake decoded, send the auth credentials */
if (gw_send_authentication_to_backend(
current_session->db,
@ -213,7 +217,7 @@ static int gw_read_backend_event(DCB *dcb) {
current_session->client_sha1,
backend_protocol) != 0)
{
backend_protocol->state = MYSQL_AUTH_FAILED;
backend_protocol->protocol_auth_state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] after "
@ -221,31 +225,31 @@ static int gw_read_backend_event(DCB *dcb) {
"fd %d, state = MYSQL_AUTH_FAILED.",
pthread_self(),
backend_protocol->owner_dcb->fd)));
} else {
backend_protocol->state = MYSQL_AUTH_RECV;
}
else
{
backend_protocol->protocol_auth_state = MYSQL_AUTH_RECV;
}
}
}
spinlock_release(&dcb->authlock);
}
/*
* Now:
* -- check the authentication reply from backend
* OR
* -- handle a previous handshake error
*/
if (backend_protocol->state == MYSQL_AUTH_RECV ||
backend_protocol->state == MYSQL_AUTH_FAILED)
if (backend_protocol->protocol_auth_state == MYSQL_AUTH_RECV ||
backend_protocol->protocol_auth_state == MYSQL_AUTH_FAILED)
{
spinlock_acquire(&dcb->authlock);
backend_protocol = (MySQLProtocol *) dcb->protocol;
CHK_PROTOCOL(backend_protocol);
if (backend_protocol->state == MYSQL_AUTH_RECV ||
backend_protocol->state == MYSQL_AUTH_FAILED)
if (backend_protocol->protocol_auth_state == MYSQL_AUTH_RECV ||
backend_protocol->protocol_auth_state == MYSQL_AUTH_FAILED)
{
ROUTER_OBJECT *router = NULL;
ROUTER *router_instance = NULL;
@ -259,7 +263,7 @@ static int gw_read_backend_event(DCB *dcb) {
router_instance = session->service->router_instance;
rsession = session->router_session;
if (backend_protocol->state == MYSQL_AUTH_RECV) {
if (backend_protocol->protocol_auth_state == MYSQL_AUTH_RECV) {
/*<
* Read backed auth reply
*/
@ -268,14 +272,14 @@ static int gw_read_backend_event(DCB *dcb) {
switch (receive_rc) {
case -1:
backend_protocol->state = MYSQL_AUTH_FAILED;
backend_protocol->protocol_auth_state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] after "
"gw_receive_backend_authentication "
"fd %d, state = MYSQL_AUTH_FAILED.",
backend_protocol->owner_dcb->fd,
pthread_self())));
pthread_self(),
backend_protocol->owner_dcb->fd)));
LOGIF(LE, (skygw_log_write_flush(
@ -286,7 +290,7 @@ static int gw_read_backend_event(DCB *dcb) {
current_session->user)));
break;
case 1:
backend_protocol->state = MYSQL_IDLE;
backend_protocol->protocol_auth_state = MYSQL_IDLE;
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
@ -316,7 +320,7 @@ static int gw_read_backend_event(DCB *dcb) {
} /* switch */
}
if (backend_protocol->state == MYSQL_AUTH_FAILED)
if (backend_protocol->protocol_auth_state == MYSQL_AUTH_FAILED)
{
/**
* protocol state won't change anymore,
@ -340,9 +344,14 @@ static int gw_read_backend_event(DCB *dcb) {
/* try reload users' table for next connection */
service_refresh_users(dcb->session->service);
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend read error handling.")));
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] "
"calling handleError. Backend "
"DCB %p, session %p",
pthread_self(),
dcb,
dcb->session)));
#endif
errbuf = mysql_create_custom_error(
@ -360,6 +369,15 @@ static int gw_read_backend_event(DCB *dcb) {
ss_dassert(!succp);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] "
"after calling handleError. Backend "
"DCB %p, session %p",
pthread_self(),
dcb,
dcb->session)));
if (session != NULL)
{
spinlock_acquire(&session->ses_lock);
@ -373,7 +391,7 @@ static int gw_read_backend_event(DCB *dcb) {
}
else
{
ss_dassert(backend_protocol->state == MYSQL_IDLE);
ss_dassert(backend_protocol->protocol_auth_state == MYSQL_IDLE);
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] "
@ -398,44 +416,31 @@ static int gw_read_backend_event(DCB *dcb) {
/* reading MySQL command output from backend and writing to the client */
{
GWBUF *writebuf = NULL;
ROUTER_OBJECT *router = NULL;
ROUTER *router_instance = NULL;
void *rsession = NULL;
SESSION *session = dcb->session;
GWBUF *readbuf = NULL;
ROUTER_OBJECT *router = NULL;
ROUTER *router_instance = NULL;
void *rsession = NULL;
SESSION *session = dcb->session;
int nbytes_read = 0;
mysql_server_cmd_t srvcmd = MYSQL_COM_UNDEFINED;
CHK_SESSION(session);
router = session->service->router;
router_instance = session->service->router_instance;
rsession = session->router_session;
/* read available backend data */
rc = dcb_read(dcb, &writebuf);
rc = dcb_read(dcb, &readbuf);
if (rc < 0)
{
/*< vraa : errorHandle */
/*<
* Backend generated EPOLLIN event and if backend has
* failed, connection must be closed to avoid backend
* dcb from getting hanged.
*/
GWBUF* errbuf;
bool succp;
/**
* - send error for client
* - mark failed backend BREF_NOT_USED
* - go through all servers and select one according to
* the criteria that user specified in the beginning.
*/
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend read error handling #2.")));
bool succp;
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend read error handling #2.")));
#endif
errbuf = mysql_create_custom_error(
1,
0,
@ -458,33 +463,111 @@ static int gw_read_backend_event(DCB *dcb) {
rc = 0;
goto return_rc;
}
nbytes_read = gwbuf_length(readbuf);
if (writebuf == NULL) {
rc = 0;
if (nbytes_read == 0)
{
goto return_rc;
}
else
{
ss_dassert(readbuf != NULL);
}
/**
* ask for next response (1 or more packets) like in
* gw_MySQL_get_next_packet but gw_MySQL_get_next_response
*/
srvcmd = protocol_get_srv_command((MySQLProtocol *)dcb->protocol,
false);
/**
* If backend DCB is waiting for response to COM_STMT_PREPARE,
* it, then only that must be passed to clientReply.
*
* If response consists of ses cmd response and response to
* COM_STMT_PREPARE, there can't be anything after
* COM_STMT_PREPARE response because whole buffer may be
* discarded since router doesn't know the borderlines of MySQL
* packets.
*/
/**
* Read all packets from <readbuf> which belong to STMT PREPARE
* response.
* Move packets not belonging to STMT PREPARE response to
* dcb_readqueue.
* When whole response is read, pass <readbuf> forward to
* clientReply.
*/
if (srvcmd == MYSQL_COM_STMT_PREPARE)
{
MySQLProtocol* p;
int nresponse_packets;
GWBUF* tmpbuf;
p = (MySQLProtocol *)dcb->protocol;
nresponse_packets = protocol_get_nresponse_packets(p);
/** count only once per response */
if (nresponse_packets == 0)
{
nresponse_packets = get_stmt_nresponse_packets(
readbuf,
srvcmd);
}
tmpbuf = gw_MySQL_get_packets(&readbuf, &nresponse_packets);
gwbuf_append(dcb->dcb_readqueue, readbuf);
readbuf = tmpbuf;
/** <readbuf> contains incomplete response to STMT PREPARE */
if (nresponse_packets != 0)
{
rc = 0;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Backend fd %d read incomplete response packet. "
"Waiting %d more, cmd %s.",
dcb->fd,
nresponse_packets,
STRPACKETTYPE(srvcmd))));
/**
* store the number of how many packets the
* reponse consists of to backend's protocol.
*/
protocol_set_nresponse_packets(p, nresponse_packets);
goto return_rc;
}
protocol_remove_srv_command((MySQLProtocol *)dcb->protocol);
}
/*<
* If dcb->session->client is freed already it may be NULL.
*/
if (dcb->session->client != NULL) {
if (dcb->session->client != NULL)
{
client_protocol = SESSION_PROTOCOL(dcb->session,
MySQLProtocol);
if (client_protocol != NULL) {
if (client_protocol != NULL)
{
CHK_PROTOCOL(client_protocol);
if (client_protocol->state == MYSQL_IDLE)
if (client_protocol->protocol_auth_state ==
MYSQL_IDLE)
{
gwbuf_set_type(writebuf, GWBUF_TYPE_MYSQL);
gwbuf_set_type(readbuf, GWBUF_TYPE_MYSQL);
router->clientReply(router_instance,
rsession,
writebuf,
readbuf,
dcb);
rc = 1;
}
goto return_rc;
} else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL) {
gwbuf_set_type(writebuf, GWBUF_TYPE_MYSQL);
router->clientReply(router_instance, rsession, writebuf, dcb);
}
else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL)
{
gwbuf_set_type(readbuf, GWBUF_TYPE_MYSQL);
router->clientReply(router_instance, rsession, readbuf, dcb);
rc = 1;
}
}
@ -550,8 +633,8 @@ static int gw_write_backend_event(DCB *dcb) {
goto return_rc;
}
if (backend_protocol->state == MYSQL_PENDING_CONNECT) {
backend_protocol->state = MYSQL_CONNECTED;
if (backend_protocol->protocol_auth_state == MYSQL_PENDING_CONNECT) {
backend_protocol->protocol_auth_state = MYSQL_CONNECTED;
rc = 1;
goto return_rc;
}
@ -571,7 +654,7 @@ return_rc:
}
/*
* Write function for backend DCB
* Write function for backend DCB. Store command to protocol.
*
* @param dcb The DCB of the backend
* @param queue Queue of buffers to write
@ -589,7 +672,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
* If auth failed, return value is 0, write and buffered write
* return 1.
*/
switch(backend_protocol->state) {
switch(backend_protocol->protocol_auth_state) {
case MYSQL_AUTH_FAILED:
{
size_t len;
@ -615,8 +698,12 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
goto return_rc;
break;
}
case MYSQL_IDLE:
{
uint8_t* ptr = GWBUF_DATA(queue);
int cmd = MYSQL_GET_COMMAND(ptr);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_MySQLWrite_backend] write to dcb %p "
@ -624,18 +711,37 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
pthread_self(),
dcb,
dcb->fd,
STRPROTOCOLSTATE(backend_protocol->state))));
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
spinlock_release(&dcb->authlock);
/**
* Server commands are stored to MySQLProtocol structure
* if buffer always includes a single statement. That
* information is stored in GWBUF type field
* (GWBUF_TYPE_SINGLE_STMT bit).
*/
if (GWBUF_IS_TYPE_SINGLE_STMT(queue))
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Write to backend's DCB fd %d "
"cmd %s protocol state %s.",
dcb->fd,
STRPACKETTYPE(cmd),
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
/** Record the command to backend's protocol */
protocol_add_srv_command(backend_protocol, cmd);
}
/** Write to backend */
rc = dcb_write(dcb, queue);
goto return_rc;
break;
}
default:
/*<
* Now put the incoming data to the delay queue unless backend is
* connected with auth ok
*/
{
uint8_t* ptr = GWBUF_DATA(queue);
int cmd = MYSQL_GET_COMMAND(ptr);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_MySQLWrite_backend] delayed write to "
@ -643,12 +749,36 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
pthread_self(),
dcb,
dcb->fd,
STRPROTOCOLSTATE(backend_protocol->state))));
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
/**
* Since it is known that buffer contains one complete
* command, store the command to backend's protocol. When
* backend server responses the command determines how
* response needs to be processed. This is mainly due to
* MYSQL_COM_STMT_PREPARE whose response consists of
* arbitrary number of packets.
*/
if (GWBUF_IS_TYPE_SINGLE_STMT(queue))
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Write to backend's delayqueue fd %d "
"protocol state %s.",
dcb->fd,
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
/** Record the command to backend's protocol */
protocol_add_srv_command(backend_protocol, cmd);
}
/*<
* Now put the incoming data to the delay queue unless backend is
* connected with auth ok
*/
backend_set_delayqueue(dcb, queue);
spinlock_release(&dcb->authlock);
rc = 1;
goto return_rc;
break;
}
}
return_rc:
return rc;
@ -757,7 +887,7 @@ static int gw_create_backend_connection(
case 0:
ss_dassert(fd > 0);
protocol->fd = fd;
protocol->state = MYSQL_CONNECTED;
protocol->protocol_auth_state = MYSQL_CONNECTED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_create_backend_connection] Established "
@ -772,7 +902,7 @@ static int gw_create_backend_connection(
case 1:
ss_dassert(fd > 0);
protocol->state = MYSQL_PENDING_CONNECT;
protocol->protocol_auth_state = MYSQL_PENDING_CONNECT;
protocol->fd = fd;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
@ -787,7 +917,7 @@ static int gw_create_backend_connection(
default:
ss_dassert(fd == -1);
ss_dassert(protocol->state == MYSQL_ALLOC);
ss_dassert(protocol->protocol_auth_state == MYSQL_ALLOC);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_create_backend_connection] Connection "
@ -957,7 +1087,7 @@ static int backend_write_delayqueue(DCB *dcb)
rc = dcb_write(dcb, localq);
}
if (rc == 0)
if (rc == 0)
{
GWBUF* errbuf;
bool succp;

View File

@ -500,7 +500,7 @@ static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue) {
int
gw_MySQLWrite_client(DCB *dcb, GWBUF *queue)
{
return dcb_write(dcb, queue);
return dcb_write(dcb, queue);
}
/**
@ -587,7 +587,7 @@ int gw_read_client_event(
/**
* Now there should be at least one complete mysql packet in read_buffer.
*/
switch (protocol->state) {
switch (protocol->protocol_auth_state) {
case MYSQL_AUTH_SENT:
{
@ -600,7 +600,7 @@ int gw_read_client_event(
if (auth_val == 0)
{
SESSION *session = NULL;
protocol->state = MYSQL_AUTH_RECV;
protocol->protocol_auth_state = MYSQL_AUTH_RECV;
/**
* Create session, and a router session for it.
* If successful, there will be backend connection(s)
@ -612,7 +612,8 @@ int gw_read_client_event(
{
CHK_SESSION(session);
ss_dassert(session->state != SESSION_STATE_ALLOC);
protocol->state = MYSQL_IDLE;
protocol->protocol_auth_state = MYSQL_IDLE;
/**
* Send an AUTH_OK packet to the client,
* packet sequence is # 2
@ -621,7 +622,7 @@ int gw_read_client_event(
}
else
{
protocol->state = MYSQL_AUTH_FAILED;
protocol->protocol_auth_state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_client_event] session "
@ -642,7 +643,7 @@ int gw_read_client_event(
}
else
{
protocol->state = MYSQL_AUTH_FAILED;
protocol->protocol_auth_state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_client_event] after "
@ -893,8 +894,8 @@ int gw_write_client_event(DCB *dcb)
}
protocol = (MySQLProtocol *)dcb->protocol;
CHK_PROTOCOL(protocol);
if (protocol->state == MYSQL_IDLE)
if (protocol->protocol_auth_state == MYSQL_IDLE)
{
dcb_drain_writeq(dcb);
goto return_1;
@ -1236,7 +1237,7 @@ int gw_MySQLAccept(DCB *listener)
MySQLSendHandshake(client_dcb);
// client protocol state change
protocol->state = MYSQL_AUTH_SENT;
protocol->protocol_auth_state = MYSQL_AUTH_SENT;
/**
* Set new descriptor to event set. At the same time,
@ -1294,8 +1295,15 @@ static int gw_error_client_event(
SESSION* session;
CHK_DCB(dcb);
session = dcb->session;
CHK_SESSION(session);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_error_client_event] Error event handling for DCB %p "
"in state %s, session %p.",
pthread_self(),
dcb,
STRDCBSTATE(dcb->state),
(session != NULL ? session : NULL))));
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
@ -1373,7 +1381,7 @@ gw_client_hangup_event(DCB *dcb)
/**
* Detect if buffer includes partial mysql packet or multiple packets.
* Store partial packet to pendingqueue. Send complete packets one by one
* Store partial packet to dcb_readqueue. Send complete packets one by one
* to router.
*
* It is assumed readbuf includes at least one complete packet.
@ -1392,6 +1400,20 @@ static int route_by_statement(SESSION *session, GWBUF *readbuf)
if (packetbuf != NULL)
{
CHK_GWBUF(packetbuf);
/**
* This means that buffer includes exactly one MySQL
* statement.
* backend func.write uses the information. MySQL backend
* protocol, for example, stores the command identifier
* to protocol structure. When some other thread reads
* the corresponding response the command tells how to
* handle response.
*
* Set it here instead of gw_read_client_event to make
* sure it is set to each (MySQL) packet.
*/
gwbuf_set_type(packetbuf, GWBUF_TYPE_SINGLE_STMT);
/** Route query */
rc = SESSION_ROUTE_QUERY(session, packetbuf);
}
else

View File

@ -76,7 +76,8 @@ MySQLProtocol* mysql_protocol_init(
strerror(eno))));
goto return_p;
}
p->state = MYSQL_ALLOC;
p->protocol_auth_state = MYSQL_ALLOC;
p->protocol_command.cmd = MYSQL_COM_UNDEFINED;
#if defined(SS_DEBUG)
p->protocol_chk_top = CHK_NUM_PROTOCOL;
p->protocol_chk_tail = CHK_NUM_PROTOCOL;
@ -151,7 +152,7 @@ int gw_read_backend_handshake(
if (h_len <= 4) {
/* log error this exit point */
conn->state = MYSQL_AUTH_FAILED;
conn->protocol_auth_state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_handshake] after "
@ -198,7 +199,7 @@ int gw_read_backend_handshake(
* data in buffer less than expected in the
* packet. Log error this exit point
*/
conn->state = MYSQL_AUTH_FAILED;
conn->protocol_auth_state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_handshake] after "
@ -223,7 +224,7 @@ int gw_read_backend_handshake(
* we cannot continue
* log error this exit point
*/
conn->state = MYSQL_AUTH_FAILED;
conn->protocol_auth_state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_handshake] after "
@ -236,7 +237,7 @@ int gw_read_backend_handshake(
return 1;
}
conn->state = MYSQL_AUTH_SENT;
conn->protocol_auth_state = MYSQL_AUTH_SENT;
// consume all the data here
head = gwbuf_consume(head, GWBUF_LENGTH(head));
@ -789,13 +790,7 @@ gw_mysql_protocol_state2string (int state) {
case MYSQL_AUTH_FAILED:
return "MySQL Authentication failed";
case MYSQL_IDLE:
return "MySQL Auth done. Protocol is idle, waiting for statements";
case MYSQL_ROUTING:
return "MySQL received command has been routed to backend(s)";
case MYSQL_WAITING_RESULT:
return "MySQL Waiting for result set";
case MYSQL_SESSION_CHANGE:
return "MySQL change session";
return "MySQL authentication is succesfully done.";
default:
return "MySQL (unknown protocol state)";
}
@ -960,11 +955,9 @@ int mysql_send_custom_error (
const char *mysql_message)
{
GWBUF* buf;
int nbytes;
buf = mysql_create_custom_error(dcb, in_affected_rows, mysql_message);
buf = mysql_create_custom_error(packet_number, in_affected_rows, mysql_message);
nbytes = GWBUF_LENGTH(buf);
dcb->func.write(dcb, buf);
return GWBUF_LENGTH(buf);
@ -1500,7 +1493,7 @@ GWBUF* gw_MySQL_get_next_packet(
packetbuf = NULL;
goto return_packetbuf;
}
/** there is one complete packet in the buffer */
if (packetlen == buflen)
{
packetbuf = gwbuf_clone_portion(readbuf, 0, packetlen);
@ -1541,3 +1534,213 @@ return_packetbuf:
return packetbuf;
}
/**
* Move <npackets> from buffer pointed to by <*p_readbuf>.
*/
GWBUF* gw_MySQL_get_packets(
GWBUF** p_srcbuf,
int* npackets)
{
GWBUF* packetbuf;
GWBUF* targetbuf = NULL;
while (*npackets > 0 && (packetbuf = gw_MySQL_get_next_packet(p_srcbuf)) != NULL)
{
targetbuf = gwbuf_append(targetbuf, packetbuf);
*npackets -= 1;
}
ss_dassert(*npackets < 128);
ss_dassert(*npackets >= 0);
return targetbuf;
}
/**
* If router expects to get separate, complete statements, add MySQL command
* to MySQLProtocol structure. It is removed when response has arrived.
*/
void protocol_add_srv_command(
MySQLProtocol* p,
mysql_server_cmd_t cmd)
{
spinlock_acquire(&p->protocol_lock);
if (p->protocol_command.cmd == MYSQL_COM_UNDEFINED)
{
p->protocol_command.cmd = cmd;
p->protocol_command.nresponse_packets = 0;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Added command %s to fd %d.",
STRPACKETTYPE(cmd),
p->owner_dcb->fd)));
}
else
{
server_command_t* c =
(server_command_t *)malloc(sizeof(server_command_t));
c->cmd = cmd;
c->nresponse_packets = 0;
c->next = NULL;
p->protocol_command.next = c;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Added another command %s to fd %d.",
STRPACKETTYPE(cmd),
p->owner_dcb->fd)));
#if defined(SS_DEBUG)
c = &p->protocol_command;
while (c != NULL && c->cmd != MYSQL_COM_UNDEFINED)
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"fd %d : %d %s",
p->owner_dcb->fd,
c->cmd,
STRPACKETTYPE(c->cmd))));
c = c->next;
}
#endif
}
spinlock_release(&p->protocol_lock);
}
/**
* If router processes separate statements, every stmt has corresponding MySQL
* command stored in MySQLProtocol structure.
*
* Remove current (=oldest) command.
*/
void protocol_remove_srv_command(
MySQLProtocol* p)
{
server_command_t* s;
spinlock_acquire(&p->protocol_lock);
s = &p->protocol_command;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Removed command %s from fd %d.",
STRPACKETTYPE(s->cmd),
p->owner_dcb->fd)));
if (s->next == NULL)
{
p->protocol_command.cmd = MYSQL_COM_UNDEFINED;
}
else
{
p->protocol_command = *(s->next);
free(s->next);
}
spinlock_release(&p->protocol_lock);
}
mysql_server_cmd_t protocol_get_srv_command(
MySQLProtocol* p,
bool removep)
{
mysql_server_cmd_t cmd;
cmd = p->protocol_command.cmd;
if (removep)
{
protocol_remove_srv_command(p);
}
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Read command %s for fd %d.",
STRPACKETTYPE(cmd),
p->owner_dcb->fd)));
return cmd;
}
/**
* Return how many packets are included in the server's response.
*/
int get_stmt_nresponse_packets(
GWBUF* buf,
mysql_server_cmd_t cmd)
{
int npackets;
uint8_t* packet;
int nparam;
int nattr;
uint8_t* data;
switch (cmd) {
case MYSQL_COM_STMT_PREPARE:
data = (uint8_t *)buf->start;
if (data[4] == 0xff)
{
npackets = 1; /*< error packet */
}
else
{
packet = (uint8_t *)GWBUF_DATA(buf);
/** ok + nparam + eof + nattr + eof */
nparam = MYSQL_GET_STMTOK_NPARAM(packet);
nattr = MYSQL_GET_STMTOK_NATTR(packet);
npackets = 1 + nparam + MIN(1, nparam) +
nattr + MIN(nattr, 1);
ss_dassert(npackets<128);
}
break;
default:
npackets = 1;
break;
}
ss_dassert(npackets<128);
return npackets;
}
int protocol_get_nresponse_packets (
MySQLProtocol* p)
{
int rval;
CHK_PROTOCOL(p);
spinlock_acquire(&p->protocol_lock);
rval = p->protocol_command.nresponse_packets;
spinlock_release(&p->protocol_lock);
ss_dassert(rval<128);
return rval;
}
bool protocol_set_nresponse_packets (
MySQLProtocol* p,
int nresponse_packets)
{
bool succp;
CHK_PROTOCOL(p);
spinlock_acquire(&p->protocol_lock);
if (p->protocol_command.nresponse_packets > 0 &&
nresponse_packets > p->protocol_command.nresponse_packets)
{
succp = false;
}
else
{
p->protocol_command.nresponse_packets = nresponse_packets;
ss_dassert(nresponse_packets<128);
succp = true;
}
spinlock_release(&p->protocol_lock);
return succp;
}

View File

@ -113,7 +113,7 @@ static void clientReply(
static void handleError(
ROUTER *instance,
void *router_session,
char *message,
GWBUF *errbuf,
DCB *backend_dcb,
int action,
bool *succp);
@ -708,7 +708,7 @@ static void
handleError(
ROUTER *instance,
void *router_session,
char *message,
GWBUF *errbuf,
DCB *backend_dcb,
int action,
bool *succp)

File diff suppressed because it is too large Load Diff

View File

@ -122,7 +122,8 @@ typedef enum skygw_chk_t {
CHK_NUM_ROUTER_PROPERTY,
CHK_NUM_SESCMD_CUR,
CHK_NUM_BACKEND,
CHK_NUM_BACKEND_REF
CHK_NUM_BACKEND_REF,
CHK_NUM_PREP_STMT
} skygw_chk_t;
# define STRBOOL(b) ((b) ? "true" : "false")
@ -140,23 +141,26 @@ typedef enum skygw_chk_t {
((i) == LOGFILE_DEBUG ? "LOGFILE_DEBUG" : \
"Unknown logfile type"))))
#define STRPACKETTYPE(p) ((p) == COM_INIT_DB ? "COM_INIT_DB" : \
((p) == COM_CREATE_DB ? "COM_CREATE_DB" : \
((p) == COM_DROP_DB ? "COM_DROP_DB" : \
((p) == COM_REFRESH ? "COM_REFRESH" : \
((p) == COM_DEBUG ? "COM_DEBUG" : \
((p) == COM_PING ? "COM_PING" : \
((p) == COM_CHANGE_USER ? "COM_CHANGE_USER" : \
((p) == COM_QUERY ? "COM_QUERY" : \
((p) == COM_SHUTDOWN ? "COM_SHUTDOWN" : \
((p) == COM_PROCESS_INFO ? "COM_PROCESS_INFO" : \
((p) == COM_CONNECT ? "COM_CONNECT" : \
((p) == COM_PROCESS_KILL ? "COM_PROCESS_KILL" : \
((p) == COM_TIME ? "COM_TIME" : \
((p) == COM_DELAYED_INSERT ? "COM_DELAYED_INSERT" : \
((p) == COM_DAEMON ? "COM_DAEMON" : \
((p) == COM_QUIT ? "COM_QUIT" : \
"UNKNOWN MYSQL PACKET TYPE"))))))))))))))))
#define STRPACKETTYPE(p) ((p) == MYSQL_COM_INIT_DB ? "COM_INIT_DB" : \
((p) == MYSQL_COM_CREATE_DB ? "COM_CREATE_DB" : \
((p) == MYSQL_COM_DROP_DB ? "COM_DROP_DB" : \
((p) == MYSQL_COM_REFRESH ? "COM_REFRESH" : \
((p) == MYSQL_COM_DEBUG ? "COM_DEBUG" : \
((p) == MYSQL_COM_PING ? "COM_PING" : \
((p) == MYSQL_COM_CHANGE_USER ? "COM_CHANGE_USER" : \
((p) == MYSQL_COM_QUERY ? "COM_QUERY" : \
((p) == MYSQL_COM_SHUTDOWN ? "COM_SHUTDOWN" : \
((p) == MYSQL_COM_PROCESS_INFO ? "COM_PROCESS_INFO" : \
((p) == MYSQL_COM_CONNECT ? "COM_CONNECT" : \
((p) == MYSQL_COM_PROCESS_KILL ? "COM_PROCESS_KILL" : \
((p) == MYSQL_COM_TIME ? "COM_TIME" : \
((p) == MYSQL_COM_DELAYED_INSERT ? "COM_DELAYED_INSERT" : \
((p) == MYSQL_COM_DAEMON ? "COM_DAEMON" : \
((p) == MYSQL_COM_QUIT ? "COM_QUIT" : \
((p) == MYSQL_COM_STMT_PREPARE ? "MYSQL_COM_STMT_PREPARE" : \
((p) == MYSQL_COM_STMT_EXECUTE ? "MYSQL_COM_STMT_EXECUTE" : \
((p) == MYSQL_COM_UNDEFINED ? "MYSQL_COM_UNDEFINED" : \
"UNKNOWN MYSQL PACKET TYPE")))))))))))))))))))
#define STRDCBSTATE(s) ((s) == DCB_STATE_ALLOC ? "DCB_STATE_ALLOC" : \
((s) == DCB_STATE_POLLING ? "DCB_STATE_POLLING" : \
@ -180,10 +184,7 @@ typedef enum skygw_chk_t {
((s) == MYSQL_AUTH_RECV ? "MYSQL_AUTH_RECV" : \
((s) == MYSQL_AUTH_FAILED ? "MYSQL_AUTH_FAILED" : \
((s) == MYSQL_IDLE ? "MYSQL_IDLE" : \
((s) == MYSQL_ROUTING ? "MYSQL_ROUTING" : \
((s) == MYSQL_WAITING_RESULT ? "MYSQL_WAITING_RESULT" : \
((s) == MYSQL_SESSION_CHANGE ? "MYSQL_SESSION_CHANGE" : \
"UNKNOWN MYSQL STATE"))))))))))
"UNKNOWN MYSQL STATE")))))))
#define STRITEMTYPE(t) ((t) == Item::FIELD_ITEM ? "FIELD_ITEM" : \
((t) == Item::FUNC_ITEM ? "FUNC_ITEM" : \
@ -478,6 +479,12 @@ typedef enum skygw_chk_t {
"Backend reference has invalid check fields"); \
}
#define CHK_PREP_STMT(p) { \
ss_info_dassert((p)->pstmt_chk_top == CHK_NUM_PREP_STMT && \
(p)->pstmt_chk_tail == CHK_NUM_PREP_STMT, \
"Prepared statement struct has invalid check fields"); \
}
#if defined(SS_DEBUG)
bool conn_open[10240];