Added support for session commands to readwrite split router.

Added support for multi-statement packets.

This is an intermediate commit to save work. Code is not cleaned and there are debug prints and prototypes to be removed.
This commit is contained in:
VilhoRaatikka
2014-03-07 20:53:33 +02:00
parent 3e111534a1
commit c28892323a
10 changed files with 921 additions and 471 deletions

View File

@ -128,10 +128,37 @@ GWBUF *rval;
rval->start = buf->start; rval->start = buf->start;
rval->end = buf->end; rval->end = buf->end;
rval->next = NULL; rval->next = NULL;
rval->command = buf->command; // rval->command = buf->command;
CHK_GWBUF(rval); CHK_GWBUF(rval);
return rval; return rval;
} }
GWBUF *gwbuf_clone_portion(
GWBUF *buf,
size_t start_offset,
size_t length)
{
GWBUF* clonebuf;
CHK_GWBUF(buf);
ss_dassert(start_offset+length <= GWBUF_LENGTH(buf));
if ((clonebuf = (GWBUF *)malloc(sizeof(GWBUF))) == NULL)
{
return NULL;
}
atomic_add(&buf->sbuf->refcount, 1);
clonebuf->sbuf = buf->sbuf;
clonebuf->start = (void *)((char*)buf->start)+start_offset;
clonebuf->end = (void *)((char *)clonebuf->start)+length;
clonebuf->next = NULL;
CHK_GWBUF(clonebuf);
return clonebuf;
}
/** /**
* Append a buffer onto a linked list of buffer structures. * Append a buffer onto a linked list of buffer structures.
* *

View File

@ -302,6 +302,8 @@ dcb_final_free(DCB *dcb)
if (dcb->remote) if (dcb->remote)
free(dcb->remote); free(dcb->remote);
bitmask_free(&dcb->memdata.bitmask); bitmask_free(&dcb->memdata.bitmask);
simple_mutex_done(&dcb->dcb_read_lock);
simple_mutex_done(&dcb->dcb_write_lock);
free(dcb); free(dcb);
} }
@ -520,6 +522,8 @@ int rc;
* Successfully connected to backend. Assign file descriptor to dcb * Successfully connected to backend. Assign file descriptor to dcb
*/ */
dcb->fd = fd; dcb->fd = fd;
/** Copy status field to DCB */
dcb->dcb_server_status = server->status;
/*< /*<
* backend_dcb is connected to backend server, and once backend_dcb * backend_dcb is connected to backend server, and once backend_dcb
@ -683,6 +687,15 @@ dcb_write(DCB *dcb, GWBUF *queue)
dcb->state != DCB_STATE_LISTENING && dcb->state != DCB_STATE_LISTENING &&
dcb->state != DCB_STATE_NOPOLLING)) dcb->state != DCB_STATE_NOPOLLING))
{ {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [dcb_write] Write aborted to dcb %p because "
"it is in state %s",
pthread_self(),
dcb->stats.n_buffered,
dcb,
STRDCBSTATE(dcb->state),
dcb->fd)));
return 0; return 0;
} }
@ -743,7 +756,11 @@ dcb_write(DCB *dcb, GWBUF *queue)
#endif /* SS_DEBUG */ #endif /* SS_DEBUG */
len = GWBUF_LENGTH(queue); len = GWBUF_LENGTH(queue);
GW_NOINTR_CALL( GW_NOINTR_CALL(
w = gw_write(dcb->fd, GWBUF_DATA(queue), len); w = gw_write(
#if defined(SS_DEBUG)
dcb,
#endif
dcb->fd, GWBUF_DATA(queue), len);
dcb->stats.n_writes++; dcb->stats.n_writes++;
); );
@ -855,7 +872,11 @@ int saved_errno = 0;
while (dcb->writeq != NULL) while (dcb->writeq != NULL)
{ {
len = GWBUF_LENGTH(dcb->writeq); len = GWBUF_LENGTH(dcb->writeq);
GW_NOINTR_CALL(w = gw_write(dcb->fd, GW_NOINTR_CALL(w = gw_write(
#if defined(SS_DEBUG)
dcb,
#endif
dcb->fd,
GWBUF_DATA(dcb->writeq), GWBUF_DATA(dcb->writeq),
len);); len););
saved_errno = errno; saved_errno = errno;
@ -1312,6 +1333,9 @@ static bool dcb_set_state_nomutex(
} }
int gw_write( int gw_write(
#if defined(SS_DEBUG)
DCB* dcb,
#endif
int fd, int fd,
const void* buf, const void* buf,
size_t nbytes) size_t nbytes)
@ -1332,6 +1356,56 @@ int gw_write(
#else #else
w = write(fd, buf, nbytes); w = write(fd, buf, nbytes);
#endif /* SS_DEBUG && SS_TEST */ #endif /* SS_DEBUG && SS_TEST */
#if defined(SS_DEBUG)
{
size_t len;
unsigned char* packet = (unsigned char *)buf;
char* str;
/** Print only MySQL packets */
if (w > 5)
{
str = (char *)&packet[5];
len = packet[0];
len += 255*packet[1];
len += 255*255*packet[2];
if (strncmp(str, "insert", 6) == 0 ||
strncmp(str, "create", 6) == 0 ||
strncmp(str, "drop", 4) == 0)
{
ss_dassert((dcb->dcb_server_status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE))==(SERVER_RUNNING|SERVER_MASTER));
}
if (strncmp(str, "set autocommit", 14) == 0 && nbytes > 17)
{
char* s = (char *)calloc(1, nbytes+1);
if (nbytes-5 > len)
{
size_t len2 = packet[4+len];
len2 += 255*packet[4+len+1];
len2 += 255*255*packet[4+len+2];
char* str2 = (char *)&packet[4+len+5];
snprintf(s, 5+len+len2, "long %s %s", (char *)str, (char *)str2);
}
else
{
snprintf(s, len, "%s", (char *)str);
}
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [gw_write] Wrote %d bytes : %s ",
pthread_self(),
w,
s)));
free(s);
}
}
}
#endif
return w; return w;
} }

View File

@ -92,6 +92,6 @@ extern GWBUF *gwbuf_clone(GWBUF *buf);
extern GWBUF *gwbuf_append(GWBUF *head, GWBUF *tail); extern GWBUF *gwbuf_append(GWBUF *head, GWBUF *tail);
extern GWBUF *gwbuf_consume(GWBUF *head, unsigned int length); extern GWBUF *gwbuf_consume(GWBUF *head, unsigned int length);
extern unsigned int gwbuf_length(GWBUF *head); extern unsigned int gwbuf_length(GWBUF *head);
extern GWBUF *gwbuf_clone_portion(GWBUF *head, size_t offset, size_t len);
#endif #endif

View File

@ -87,6 +87,7 @@ typedef struct gw_protocol {
int (*listen)(struct dcb *, char *); int (*listen)(struct dcb *, char *);
int (*auth)(struct dcb *, struct server *, struct session *, GWBUF *); int (*auth)(struct dcb *, struct server *, struct session *, GWBUF *);
int (*session)(struct dcb *, void *); int (*session)(struct dcb *, void *);
void* (*getstmt)(void* buf);
} GWPROTOCOL; } GWPROTOCOL;
/** /**
@ -176,7 +177,7 @@ typedef struct dcb {
SPINLOCK authlock; /**< Generic Authorization spinlock */ SPINLOCK authlock; /**< Generic Authorization spinlock */
DCBSTATS stats; /**< DCB related statistics */ DCBSTATS stats; /**< DCB related statistics */
unsigned int dcb_server_status; /*< the server role indicator from SERVER */
struct dcb *next; /**< Next DCB in the chain of allocated DCB's */ struct dcb *next; /**< Next DCB in the chain of allocated DCB's */
struct service *service; /**< The related service */ struct service *service; /**< The related service */
void *data; /**< Specific client data */ void *data; /**< Specific client data */
@ -202,7 +203,13 @@ int fail_accept_errno;
#define DCB_ISZOMBIE(x) ((x)->state == DCB_STATE_ZOMBIE) #define DCB_ISZOMBIE(x) ((x)->state == DCB_STATE_ZOMBIE)
DCB *dcb_get_zombies(void); DCB *dcb_get_zombies(void);
int gw_write(int fd, const void* buf, size_t nbytes); int gw_write(
#if defined(SS_DEBUG)
DCB* dcb,
#endif
int fd,
const void* buf,
size_t nbytes);
int dcb_write(DCB *, GWBUF *); int dcb_write(DCB *, GWBUF *);
DCB *dcb_alloc(dcb_role_t); DCB *dcb_alloc(dcb_role_t);
void dcb_free(DCB *); void dcb_free(DCB *);

View File

@ -56,6 +56,12 @@ int MySQLWrite(DCB *dcb, GWBUF *queue);
int gw_write_backend_event(DCB *dcb); int gw_write_backend_event(DCB *dcb);
int gw_read_backend_event(DCB *dcb); int gw_read_backend_event(DCB *dcb);
int setnonblocking(int fd); int setnonblocking(int fd);
int gw_write(int fd, const void* buf, size_t nbytes); int gw_write(
#if defined(SS_DEBUG)
DCB* dcb,
#endif
int fd,
const void* buf,
size_t nbytes);
int gw_getsockerrno(int fd); int gw_getsockerrno(int fd);
int parse_bindconfig(char *, unsigned short, struct sockaddr_in *); int parse_bindconfig(char *, unsigned short, struct sockaddr_in *);

View File

@ -46,12 +46,19 @@ typedef struct router_client_session ROUTER_CLIENT_SES;
typedef enum rses_property_type_t { typedef enum rses_property_type_t {
RSES_PROP_TYPE_UNDEFINED=0, RSES_PROP_TYPE_UNDEFINED=0,
RSES_PROP_TYPE_FIRST, RSES_PROP_TYPE_SESCMD,
RSES_PROP_TYPE_SESCMD=RSES_PROP_TYPE_FIRST, RSES_PROP_TYPE_FIRST = RSES_PROP_TYPE_SESCMD,
RSES_PROP_TYPE_LAST=RSES_PROP_TYPE_SESCMD, RSES_PROP_TYPE_LAST=RSES_PROP_TYPE_SESCMD,
RSES_PROP_TYPE_COUNT=RSES_PROP_TYPE_LAST+1 RSES_PROP_TYPE_COUNT=RSES_PROP_TYPE_LAST+1
} rses_property_type_t; } rses_property_type_t;
typedef enum backend_type_t {
BE_UNDEFINED=-1,
BE_MASTER,
BE_SLAVE,
BE_COUNT
} backend_type_t;
/** /**
* Session variable command * Session variable command
*/ */
@ -59,9 +66,9 @@ typedef struct mysql_sescmd_st {
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
skygw_chk_t my_sescmd_chk_top; skygw_chk_t my_sescmd_chk_top;
#endif #endif
ROUTER_CLIENT_SES* my_sescmd_rsession; /*< parent router session */ // ROUTER_CLIENT_SES* my_sescmd_rsession; /*< parent router session */
rses_property_t* my_sescmd_prop; /*< parent property */ rses_property_t* my_sescmd_prop; /*< parent property */
GWBUF* my_sescmd_buf; /*< client query reference */ GWBUF* my_sescmd_buf; /*< query buffer */
bool my_sescmd_is_replied; /*< is cmd replied to client */ bool my_sescmd_is_replied; /*< is cmd replied to client */
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
skygw_chk_t my_sescmd_chk_tail; skygw_chk_t my_sescmd_chk_tail;
@ -76,7 +83,8 @@ struct rses_property_st {
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
skygw_chk_t rses_prop_chk_top; skygw_chk_t rses_prop_chk_top;
#endif #endif
SPINLOCK rses_prop_lock; /*< protect property content */ ROUTER_CLIENT_SES* rses_prop_rsession; /*< parent router session */
// SPINLOCK rses_prop_lock; /*< protect property content */
int rses_prop_refcount; int rses_prop_refcount;
rses_property_type_t rses_prop_type; rses_property_type_t rses_prop_type;
union rses_prop_data { union rses_prop_data {
@ -90,18 +98,13 @@ struct rses_property_st {
}; };
typedef struct sescmd_cursor_st { typedef struct sescmd_cursor_st {
ROUTER_CLIENT_SES* scmd_cur_rses; /*< pointer to owning router session */
rses_property_t** scmd_cur_ptr_property; /*< address of pointer to owner property */ rses_property_t** scmd_cur_ptr_property; /*< address of pointer to owner property */
mysql_sescmd_t* scmd_cur_cmd; /*< pointer to current session command */ mysql_sescmd_t* scmd_cur_cmd; /*< pointer to current session command */
bool scmd_cur_active; /*< true if command is being executed */ bool scmd_cur_active; /*< true if command is being executed */
backend_type_t scmd_cur_be_type; /*< BE_MASTER or BE_SLAVE */
} sescmd_cursor_t; } sescmd_cursor_t;
typedef enum backend_type_t {
BE_UNDEFINED=-1,
BE_MASTER,
BE_SLAVE,
BE_COUNT
} backend_type_t;
/** /**
* The client session structure used within this router. * The client session structure used within this router.
*/ */

View File

@ -521,7 +521,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
/*< /*<
* Don't write to backend if backend_dcb is not in poll set anymore. * Don't write to backend if backend_dcb is not in poll set anymore.
*/ */
spinlock_acquire(&dcb->authlock); spinlock_acquire(&dcb->dcb_initlock);
if (dcb->state != DCB_STATE_POLLING) { if (dcb->state != DCB_STATE_POLLING) {
/*< vraa : errorHandle */ /*< vraa : errorHandle */
/*< Free buffer memory */ /*< Free buffer memory */
@ -536,10 +536,12 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
dcb->fd, dcb->fd,
STRDCBSTATE(dcb->state)))); STRDCBSTATE(dcb->state))));
spinlock_release(&dcb->authlock); spinlock_release(&dcb->dcb_initlock);
return 0; return 0;
} }
spinlock_release(&dcb->dcb_initlock);
spinlock_acquire(&dcb->authlock);
/*< /*<
* Now put the incoming data to the delay queue unless backend is * Now put the incoming data to the delay queue unless backend is
* connected with auth ok * connected with auth ok
@ -553,7 +555,6 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
dcb, dcb,
dcb->fd, dcb->fd,
STRPROTOCOLSTATE(backend_protocol->state)))); STRPROTOCOLSTATE(backend_protocol->state))));
backend_set_delayqueue(dcb, queue); backend_set_delayqueue(dcb, queue);
spinlock_release(&dcb->authlock); spinlock_release(&dcb->authlock);
return 1; return 1;
@ -562,9 +563,9 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
/*< /*<
* Now we set the last command received, from the current queue * Now we set the last command received, from the current queue
*/ */
memcpy(&dcb->command, &queue->command, sizeof(dcb->command)); // memcpy(&dcb->command, &queue->command, sizeof(dcb->command));
spinlock_release(&dcb->authlock); spinlock_release(&dcb->authlock);
// LOGIF(LD, debuglog_statements(dcb, gwbuf_clone(queue)));
rc = dcb_write(dcb, queue); rc = dcb_write(dcb, queue);
return rc; return rc;
} }
@ -805,7 +806,7 @@ static int backend_write_delayqueue(DCB *dcb)
* Now we set the last command received, from the delayed queue * Now we set the last command received, from the delayed queue
*/ */
memcpy(&dcb->command, &localq->command, sizeof(dcb->command)); // memcpy(&dcb->command, &localq->command, sizeof(dcb->command));
spinlock_release(&dcb->delayqlock); spinlock_release(&dcb->delayqlock);
rc = dcb_write(dcb, localq); rc = dcb_write(dcb, localq);
@ -911,7 +912,6 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB
strcpy(current_session->user, username); strcpy(current_session->user, username);
strcpy(current_session->db, database); strcpy(current_session->db, database);
memcpy(current_session->client_sha1, client_sha1, sizeof(current_session->client_sha1)); memcpy(current_session->client_sha1, client_sha1, sizeof(current_session->client_sha1));
} }
// consume all the data received from client // consume all the data received from client

View File

@ -48,6 +48,7 @@ static int gw_MySQLWrite_client(DCB *dcb, GWBUF *queue);
static int gw_error_client_event(DCB *dcb); static int gw_error_client_event(DCB *dcb);
static int gw_client_close(DCB *dcb); static int gw_client_close(DCB *dcb);
static int gw_client_hangup_event(DCB *dcb); static int gw_client_hangup_event(DCB *dcb);
static void* gw_MySQL_get_next_stmt(void* buffer);
int mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message); int mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message);
int MySQLSendHandshake(DCB* dcb); int MySQLSendHandshake(DCB* dcb);
@ -67,8 +68,9 @@ static GWPROTOCOL MyObject = {
gw_client_close, /* Close */ gw_client_close, /* Close */
gw_MySQLListener, /* Listen */ gw_MySQLListener, /* Listen */
NULL, /* Authentication */ NULL, /* Authentication */
NULL /* Session */ NULL, /* Session */
}; gw_MySQL_get_next_stmt /* get single stmt from read buf */
};
/** /**
* Implementation of the mandatory version entry point * Implementation of the mandatory version entry point
@ -607,8 +609,7 @@ int gw_read_client_event(DCB* dcb) {
*/ */
{ {
int len = -1; int len = -1;
GWBUF *queue = NULL; GWBUF *read_buffer = NULL;
GWBUF *gw_buffer = NULL;
uint8_t *ptr_buff = NULL; uint8_t *ptr_buff = NULL;
int mysql_command = -1; int mysql_command = -1;
@ -626,16 +627,15 @@ int gw_read_client_event(DCB* dcb) {
////////////////////////////////////////////////////// //////////////////////////////////////////////////////
// read and handle errors & close, or return if busy // read and handle errors & close, or return if busy
////////////////////////////////////////////////////// //////////////////////////////////////////////////////
rc = gw_read_gwbuff(dcb, &gw_buffer, b); rc = gw_read_gwbuff(dcb, &read_buffer, b);
if (rc != 0) { if (rc != 0) {
goto return_rc; goto return_rc;
} }
/* Now, we are assuming in the first buffer there is /* Now, we are assuming in the first buffer there is
* the information form mysql command */ * the information form mysql command */
queue = gw_buffer; len = GWBUF_LENGTH(read_buffer);
len = GWBUF_LENGTH(queue); ptr_buff = GWBUF_DATA(read_buffer);
ptr_buff = GWBUF_DATA(queue);
/* get mysql commang at fifth byte */ /* get mysql commang at fifth byte */
if (ptr_buff) { if (ptr_buff) {
@ -669,12 +669,12 @@ int gw_read_client_event(DCB* dcb) {
} }
rc = 1; rc = 1;
/** Free buffer */ /** Free buffer */
queue = gwbuf_consume(queue, len); read_buffer = gwbuf_consume(read_buffer, len);
goto return_rc; goto return_rc;
} }
/** Route COM_QUIT to backend */ /** Route COM_QUIT to backend */
if (mysql_command == '\x01') { if (mysql_command == '\x01') {
router->routeQuery(router_instance, rsession, queue); router->routeQuery(router_instance, rsession, read_buffer);
LOGIF(LD, (skygw_log_write_flush( LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG, LOGFILE_DEBUG,
"%lu [gw_read_client_event] Routed COM_QUIT to " "%lu [gw_read_client_event] Routed COM_QUIT to "
@ -693,7 +693,7 @@ int gw_read_client_event(DCB* dcb) {
/** Route other commands to backend */ /** Route other commands to backend */
rc = router->routeQuery(router_instance, rc = router->routeQuery(router_instance,
rsession, rsession,
queue); read_buffer);
/** succeed */ /** succeed */
if (rc == 1) { if (rc == 1) {
rc = 0; /**< here '0' means success */ rc = 0; /**< here '0' means success */
@ -1203,3 +1203,47 @@ gw_client_hangup_event(DCB *dcb)
return_rc: return_rc:
return rc; return rc;
} }
/**
* Remove the first mysql statement from buffer. Return pointer to the removed
* statement or NULL if buffer is empty.
*
* Clone buf, calculate the length of included mysql stmt, and point the
* statement with cloned buffer. Move the start pointer of buf accordingly
* so that it only cover the remaining buffer.
*
*/
static void* gw_MySQL_get_next_stmt(
void* buffer)
{
GWBUF* readbuf = (GWBUF *)buffer;
GWBUF* stmtbuf;
unsigned char* packet;
size_t len;
CHK_GWBUF(readbuf);
if (GWBUF_EMPTY(readbuf))
{
stmtbuf = NULL;
goto return_stmtbuf;
}
packet = GWBUF_DATA(readbuf);
len = packet[0];
len += 255*packet[1];
len += 255*255*packet[2];
/** vraa :Multi-packet stmt is not supported as of 7.3.14 */
if (len+4 > GWBUF_LENGTH(readbuf))
{
stmtbuf = NULL;
goto return_stmtbuf;
}
stmtbuf = gwbuf_clone_portion(readbuf, 0, 4+len);
gwbuf_consume(readbuf, 4+len);
return_stmtbuf:
return (void *)stmtbuf;
}

View File

@ -109,19 +109,25 @@ static void rses_property_add(
static void rses_property_done( static void rses_property_done(
rses_property_t* prop); rses_property_t* prop);
static mysql_sescmd_t* rses_property_get_sescmd(
rses_property_t* prop);
static sescmd_cursor_t* rses_get_sescmd_cursor( static sescmd_cursor_t* rses_get_sescmd_cursor(
ROUTER_CLIENT_SES* rses, ROUTER_CLIENT_SES* rses,
backend_type_t be_type); backend_type_t be_type);
static bool execute_sescmd_in_backend( static bool execute_sescmd_in_backend(
ROUTER_CLIENT_SES* rses, ROUTER_CLIENT_SES* rses,
backend_type_t be_type); backend_type_t be_type);
static void sescmd_cursor_set_active(
sescmd_cursor_t* sescmd_cursor,
bool value);
static bool sescmd_cursor_is_active( static bool sescmd_cursor_is_active(
sescmd_cursor_t* sescmd_cursor); sescmd_cursor_t* sescmd_cursor);
static GWBUF* sescmd_cursor_get_querybuf( static GWBUF* sescmd_cursor_clone_querybuf(
sescmd_cursor_t* scur); sescmd_cursor_t* scur);
static mysql_sescmd_t* sescmd_cursor_get_command( static mysql_sescmd_t* sescmd_cursor_get_command(
@ -130,14 +136,21 @@ static mysql_sescmd_t* sescmd_cursor_get_command(
static bool sescmd_cursor_next( static bool sescmd_cursor_next(
sescmd_cursor_t* scur); sescmd_cursor_t* scur);
static void sescmd_reply_to_client( static bool sescmd_reply_to_client(
DCB* client_dcb, DCB* client_dcb,
mysql_sescmd_t* scmd); mysql_sescmd_t* scmd,
GWBUF* writebuf);
static bool cont_exec_sescmd_in_backend( static bool cont_exec_sescmd_in_backend(
ROUTER_CLIENT_SES* rses, ROUTER_CLIENT_SES* rses,
backend_type_t be_type); backend_type_t be_type);
static void tracelog_routed_query(
ROUTER_CLIENT_SES* rses,
char* funcname,
DCB* dcb,
GWBUF* buf);
static SPINLOCK instlock; static SPINLOCK instlock;
static ROUTER_INSTANCE* instances; static ROUTER_INSTANCE* instances;
@ -337,15 +350,19 @@ static void* newSession(
client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES;
#endif #endif
/** store pointers to sescmd list to both cursors */ /** store pointers to sescmd list to both cursors */
client_rses->rses_cursor[BE_MASTER].scmd_cur_rses = client_rses;
client_rses->rses_cursor[BE_MASTER].scmd_cur_active = false; client_rses->rses_cursor[BE_MASTER].scmd_cur_active = false;
client_rses->rses_cursor[BE_MASTER].scmd_cur_cmd = NULL;
client_rses->rses_cursor[BE_MASTER].scmd_cur_ptr_property = client_rses->rses_cursor[BE_MASTER].scmd_cur_ptr_property =
&client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD];
client_rses->rses_cursor[BE_MASTER].scmd_cur_cmd = NULL;
client_rses->rses_cursor[BE_MASTER].scmd_cur_be_type = BE_MASTER;
client_rses->rses_cursor[BE_SLAVE].scmd_cur_rses = client_rses;
client_rses->rses_cursor[BE_SLAVE].scmd_cur_active = false; client_rses->rses_cursor[BE_SLAVE].scmd_cur_active = false;
client_rses->rses_cursor[BE_SLAVE].scmd_cur_cmd = NULL;
client_rses->rses_cursor[BE_SLAVE].scmd_cur_ptr_property = client_rses->rses_cursor[BE_SLAVE].scmd_cur_ptr_property =
&client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD];
client_rses->rses_cursor[BE_SLAVE].scmd_cur_cmd = NULL;
client_rses->rses_cursor[BE_SLAVE].scmd_cur_be_type = BE_SLAVE;
/** /**
* Find a backend server to connect to. This is the extent of the * Find a backend server to connect to. This is the extent of the
@ -544,6 +561,7 @@ static int routeQuery(
GWBUF* querybuf) GWBUF* querybuf)
{ {
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
GWBUF* stmtbuf;
char* querystr = NULL; char* querystr = NULL;
char* startpos; char* startpos;
size_t len; size_t len;
@ -552,7 +570,7 @@ static int routeQuery(
int ret = 0; int ret = 0;
DCB* master_dcb = NULL; DCB* master_dcb = NULL;
DCB* slave_dcb = NULL; DCB* slave_dcb = NULL;
GWBUF* bufcopy = NULL; // GWBUF* bufcopy = NULL;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
bool rses_is_closed; bool rses_is_closed;
@ -561,8 +579,21 @@ static int routeQuery(
CHK_CLIENT_RSES(router_cli_ses); CHK_CLIENT_RSES(router_cli_ses);
inst->stats.n_queries++; inst->stats.n_queries++;
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
goto return_ret;
}
master_dcb = router_cli_ses->rses_dcb[BE_MASTER];
packet = GWBUF_DATA(querybuf); /** stmtbuf is clone of querybuf, and only covers one stmt */
stmtbuf = (GWBUF *)master_dcb->session->client->func.getstmt((void *)querybuf);
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
while (stmtbuf != NULL)
{
packet = GWBUF_DATA(stmtbuf);
packet_type = packet[4]; packet_type = packet[4];
startpos = (char *)&packet[5]; startpos = (char *)&packet[5];
len = packet[0]; len = packet[0];
@ -613,7 +644,8 @@ static int routeQuery(
/** /**
* Lock router client session for secure read of DCBs * Lock router client session for secure read of DCBs
*/ */
rses_is_closed = !(rses_begin_locked_router_action(router_cli_ses)); rses_is_closed =
!(rses_begin_locked_router_action(router_cli_ses));
} }
if (!rses_is_closed) if (!rses_is_closed)
@ -628,14 +660,14 @@ static int routeQuery(
{ {
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error: Failed to route %s:%s:\"%s\" to backend server. " "Error: Failed to route %s:%s:\"%s\" to "
"%s.", "backend server. %s.",
STRPACKETTYPE(packet_type), STRPACKETTYPE(packet_type),
STRQTYPE(qtype), STRQTYPE(qtype),
(querystr == NULL ? "(empty)" : querystr), (querystr == NULL ? "(empty)" : querystr),
(rses_is_closed ? "Router was closed" : (rses_is_closed ? "Router was closed" :
"Router has no backend servers where to route to")))); "Router has no backend servers where to "
"route to"))));
goto return_ret; goto return_ret;
} }
@ -654,7 +686,13 @@ static int routeQuery(
"routing to Master.", "routing to Master.",
pthread_self(), pthread_self(),
STRQTYPE(qtype)))); STRQTYPE(qtype))));
ret = master_dcb->func.write(master_dcb, querybuf);
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
master_dcb,
gwbuf_clone(stmtbuf)));
ret = master_dcb->func.write(master_dcb, stmtbuf);
atomic_add(&inst->stats.n_master, 1); atomic_add(&inst->stats.n_master, 1);
goto return_ret; goto return_ret;
@ -673,16 +711,30 @@ static int routeQuery(
/** Log error to debug */ /** Log error to debug */
goto return_ret; goto return_ret;
} }
/** If session command is being executed, route to master */ /**
* If session command is being executed in slave
* route to master
*/
if (sescmd_cursor_is_active(rses_get_sescmd_cursor( if (sescmd_cursor_is_active(rses_get_sescmd_cursor(
router_cli_ses, router_cli_ses,
BE_MASTER))) BE_SLAVE)))
{ {
ret = master_dcb->func.write(master_dcb, querybuf); LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
master_dcb,
gwbuf_clone(stmtbuf)));
ret = master_dcb->func.write(master_dcb, stmtbuf);
atomic_add(&inst->stats.n_master, 1); atomic_add(&inst->stats.n_master, 1);
} }
else{ else
ret = slave_dcb->func.write(slave_dcb, querybuf); {
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
slave_dcb,
gwbuf_clone(stmtbuf)));
ret = slave_dcb->func.write(slave_dcb, stmtbuf);
atomic_add(&inst->stats.n_slave, 1); atomic_add(&inst->stats.n_slave, 1);
} }
rses_end_locked_router_action(router_cli_ses); rses_end_locked_router_action(router_cli_ses);
@ -741,27 +793,41 @@ static int routeQuery(
STRQTYPE(qtype), STRQTYPE(qtype),
STRPACKETTYPE(packet_type)))); STRPACKETTYPE(packet_type))));
bufcopy = gwbuf_clone(querybuf);
switch(packet_type) { switch(packet_type) {
/**
case COM_QUIT: case COM_QUIT:
ret = master_dcb->func.write(master_dcb, querybuf); ret = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf));
slave_dcb->func.write(slave_dcb, bufcopy); slave_dcb->func.write(slave_dcb, querybuf);
break; break;
*/
case COM_CHANGE_USER: case COM_CHANGE_USER:
LOGIF(LT, tracelog_routed_query(
router_cli_ses,
"routeQuery",
master_dcb,
gwbuf_clone(stmtbuf)));
master_dcb->func.auth( master_dcb->func.auth(
master_dcb, master_dcb,
NULL, NULL,
master_dcb->session, master_dcb->session,
querybuf); gwbuf_clone(stmtbuf));
LOGIF(LT, tracelog_routed_query(
router_cli_ses,
"routeQuery",
slave_dcb,
gwbuf_clone(stmtbuf)));
slave_dcb->func.auth( slave_dcb->func.auth(
slave_dcb, slave_dcb,
NULL, NULL,
master_dcb->session, master_dcb->session,
bufcopy); stmtbuf);
break; break;
case COM_QUIT:
case COM_QUERY: case COM_QUERY:
/** /**
* 1. Create new property of type RSES_PROP_TYPE_SESCMD. * 1. Create new property of type RSES_PROP_TYPE_SESCMD.
@ -782,9 +848,7 @@ static int routeQuery(
* prevent it from being released before properties * prevent it from being released before properties
* are cleaned up as a part of router sessionclean-up. * are cleaned up as a part of router sessionclean-up.
*/ */
mysql_sescmd_init(prop, mysql_sescmd_init(prop, stmtbuf, router_cli_ses);
gwbuf_clone(querybuf),
router_cli_ses);
/** Lock router session */ /** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses)) if (!rses_begin_locked_router_action(router_cli_ses))
@ -796,12 +860,20 @@ static int routeQuery(
rses_property_add(router_cli_ses, prop); rses_property_add(router_cli_ses, prop);
/** Execute session command in master */ /** Execute session command in master */
if (!execute_sescmd_in_backend(router_cli_ses, BE_MASTER)) if (execute_sescmd_in_backend(router_cli_ses, BE_MASTER))
{
ret = 1;
}
else
{ {
/** Log error */ /** Log error */
} }
/** Execute session command in slave */ /** Execute session command in slave */
if (!execute_sescmd_in_backend(router_cli_ses, BE_SLAVE)) if (execute_sescmd_in_backend(router_cli_ses, BE_SLAVE))
{
ret = 1;
}
else
{ {
/** Log error */ /** Log error */
} }
@ -811,9 +883,19 @@ static int routeQuery(
break; break;
default: default:
ret = master_dcb->func.session(master_dcb, LOGIF(LT, tracelog_routed_query(router_cli_ses,
(void *)querybuf); "routeQuery",
slave_dcb->func.session(slave_dcb, (void *)bufcopy); master_dcb,
gwbuf_clone(stmtbuf)));
ret = master_dcb->func.write(master_dcb,
(void *)gwbuf_clone(stmtbuf));
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
slave_dcb,
gwbuf_clone(stmtbuf)));
slave_dcb->func.write(slave_dcb, (void *)stmtbuf);
break; break;
} /**< switch by packet type */ } /**< switch by packet type */
@ -833,12 +915,28 @@ static int routeQuery(
* Is this really ok? * Is this really ok?
* What is not known is routed to master. * What is not known is routed to master.
*/ */
ret = master_dcb->func.write(master_dcb, querybuf); LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
master_dcb,
gwbuf_clone(stmtbuf)));
ret = master_dcb->func.write(master_dcb, stmtbuf);
atomic_add(&inst->stats.n_master, 1); atomic_add(&inst->stats.n_master, 1);
goto return_ret; goto return_ret;
break; break;
} /**< switch by query type */ } /*< switch by query type */
/** get next stmt */
if (!rses_begin_locked_router_action(router_cli_ses))
{
goto return_ret;
}
master_dcb = router_cli_ses->rses_dcb[BE_MASTER];
/** stmtbuf is clone of querybuf, and only covers one stmt */
stmtbuf = (GWBUF *)master_dcb->session->client->func.getstmt((void *)querybuf);
rses_end_locked_router_action(router_cli_ses);
} /* while (stmtbuf != NULL) */
return_ret: return_ret:
free(querystr); free(querystr);
return ret; return ret;
@ -984,16 +1082,20 @@ static void clientReply(
*/ */
if (!rses_begin_locked_router_action(router_cli_ses)) if (!rses_begin_locked_router_action(router_cli_ses))
{ {
/** is this needed ??*/
gwbuf_consume(writebuf, gwbuf_length(writebuf));
goto lock_failed; goto lock_failed;
} }
master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; master_dcb = router_cli_ses->rses_dcb[BE_MASTER];
slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE];
/** Holding lock ensures that router session remains open */
ss_dassert(backend_dcb->session != NULL);
client_dcb = backend_dcb->session->client;
/** Unlock */ /** Unlock */
rses_end_locked_router_action(router_cli_ses); rses_end_locked_router_action(router_cli_ses);
client_dcb = backend_dcb->session->client;
/** /**
* 1. Check if backend received reply to sescmd. * 1. Check if backend received reply to sescmd.
* 2. Check sescmd's state whether OK_PACKET has been * 2. Check sescmd's state whether OK_PACKET has been
@ -1018,9 +1120,13 @@ static void clientReply(
{ {
be_type = BE_SLAVE; be_type = BE_SLAVE;
} }
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
/** Log to debug that router was closed */
goto lock_failed;
}
scur = rses_get_sescmd_cursor(router_cli_ses, be_type); scur = rses_get_sescmd_cursor(router_cli_ses, be_type);
ss_dassert(writebuf == sescmd_cursor_get_querybuf(scur));
/** /**
* Active cursor means that reply is from session command * Active cursor means that reply is from session command
* execution. * execution.
@ -1028,17 +1134,9 @@ static void clientReply(
if (sescmd_cursor_is_active(scur)) if (sescmd_cursor_is_active(scur))
{ {
mysql_sescmd_t* scmd = sescmd_cursor_get_command(scur); mysql_sescmd_t* scmd = sescmd_cursor_get_command(scur);
sescmd_reply_to_client(client_dcb, scmd, writebuf);
sescmd_reply_to_client(client_dcb, scmd); /** Read next sescmd property */
/**
* If there is a pending sescmd, start its execution.
*/
if (!rses_begin_locked_router_action(router_cli_ses))
{
goto lock_failed;
}
/** Start execution of all pending ses commands. */
while (sescmd_cursor_next(scur)) while (sescmd_cursor_next(scur))
{ {
if (!cont_exec_sescmd_in_backend(router_cli_ses, be_type)) if (!cont_exec_sescmd_in_backend(router_cli_ses, be_type))
@ -1050,8 +1148,26 @@ static void clientReply(
/** Log execution of pending sescmd */ /** Log execution of pending sescmd */
} }
} }
/** Set cursor passive. */
sescmd_cursor_set_active(scur, false);
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses); rses_end_locked_router_action(router_cli_ses);
} }
else if (client_dcb != NULL)
{
/** Write reply to client DCB */
client_dcb->func.write(client_dcb, writebuf);
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [clientReply:rwsplit] client dcb %p, "
"backend dcb %p. End of normal reply.",
pthread_self(),
client_dcb,
backend_dcb)));
}
return; /*< succeed */ return; /*< succeed */
lock_failed: lock_failed:
/** log that router session couldn't be locked */ /** log that router session couldn't be locked */
@ -1230,7 +1346,7 @@ static rses_property_t* rses_property_init(
{ {
goto return_prop; goto return_prop;
} }
spinlock_init(&prop->rses_prop_lock); // spinlock_init(&prop->rses_prop_lock);
prop->rses_prop_type = prop_type; prop->rses_prop_type = prop_type;
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
prop->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY; prop->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY;
@ -1242,29 +1358,6 @@ return_prop:
return prop; return prop;
} }
/**
* Create session command property.
*/
static mysql_sescmd_t* mysql_sescmd_init (
rses_property_t* rses_prop,
GWBUF* sescmd_buf,
ROUTER_CLIENT_SES* rses)
{
mysql_sescmd_t* sescmd;
CHK_RSES_PROP(rses_prop);
sescmd = &rses_prop->rses_prop_data.sescmd;
sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */
#if defined(SS_DEBUG)
sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD;
sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD;
#endif
sescmd->my_sescmd_buf = sescmd_buf; /*< session command query */
ss_dassert(sescmd_buf->sbuf->refcount > 0);
return sescmd;
}
/** /**
* Property is freed at the end of router client session. * Property is freed at the end of router client session.
*/ */
@ -1292,15 +1385,6 @@ static void rses_property_done(
free(prop); free(prop);
} }
static void mysql_sescmd_done(
mysql_sescmd_t* sescmd)
{
CHK_RSES_PROP(sescmd->my_sescmd_prop);
gwbuf_free(sescmd->my_sescmd_buf);
memset(sescmd, 0, sizeof(mysql_sescmd_t));
}
/** /**
* Add property to the router_client_ses structure's rses_properties * Add property to the router_client_ses structure's rses_properties
* array. The slot is determined by the type of property. * array. The slot is determined by the type of property.
@ -1316,7 +1400,9 @@ static void rses_property_add(
CHK_CLIENT_RSES(rses); CHK_CLIENT_RSES(rses);
CHK_RSES_PROP(prop); CHK_RSES_PROP(prop);
ss_dassert(rses->rses_lock.lock != 0); ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
prop->rses_prop_rsession = rses;
p = rses->rses_properties[prop->rses_prop_type]; p = rses->rses_properties[prop->rses_prop_type];
if (p == NULL) if (p == NULL)
@ -1333,6 +1419,28 @@ static void rses_property_add(
} }
} }
/** Router sessiosn must be locked */
static mysql_sescmd_t* rses_property_get_sescmd(
rses_property_t* prop)
{
mysql_sescmd_t* sescmd;
CHK_RSES_PROP(prop);
ss_dassert(prop->rses_prop_rsession == NULL ||
SPINLOCK_IS_LOCKED(&prop->rses_prop_rsession->rses_lock));
sescmd = &prop->rses_prop_data.sescmd;
if (sescmd != NULL)
{
CHK_MYSQL_SESCMD(sescmd);
}
return sescmd;
}
// static rses_property_t* rses_property_get_ptr_next(
/**
static void rses_begin_locked_property_action( static void rses_begin_locked_property_action(
rses_property_t* prop) rses_property_t* prop)
{ {
@ -1346,48 +1454,103 @@ static void rses_end_locked_property_action(
CHK_RSES_PROP(prop); CHK_RSES_PROP(prop);
spinlock_release(&prop->rses_prop_lock); spinlock_release(&prop->rses_prop_lock);
} }
*/
/**
/** router must be locked */ * Create session command property.
static void sescmd_cursor_set_active( */
sescmd_cursor_t* sescmd_cursor, static mysql_sescmd_t* mysql_sescmd_init (
bool value) rses_property_t* rses_prop,
GWBUF* sescmd_buf,
ROUTER_CLIENT_SES* rses)
{ {
ss_dassert(SPINLOCK_IS_LOCKED(&(*sescmd_cursor->scmd_cur_ptr_property)->rses_prop_lock)); mysql_sescmd_t* sescmd;
/** avoid calling unnecessarily */
ss_dassert(sescmd_cursor->scmd_cur_active != value); CHK_RSES_PROP(rses_prop);
sescmd_cursor->scmd_cur_active = value; /** Can't call rses_property_get_sescmd with uninitialized sescmd */
sescmd = &rses_prop->rses_prop_data.sescmd;
sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */
// sescmd->my_sescmd_rsession = rses;
#if defined(SS_DEBUG)
sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD;
sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD;
#endif
/** Set session command buffer */
sescmd->my_sescmd_buf = sescmd_buf;
return sescmd;
} }
static void sescmd_reply_to_client( static void mysql_sescmd_done(
DCB* client_dcb, mysql_sescmd_t* sescmd)
mysql_sescmd_t* scmd)
{ {
rses_property_t* prop; CHK_RSES_PROP(sescmd->my_sescmd_prop);
gwbuf_free(sescmd->my_sescmd_buf);
memset(sescmd, 0, sizeof(mysql_sescmd_t));
}
/**
* Write session command reply from backend to client if command haven't yet
* been replied.
* Return true if succeed, false if command was already replied.
*
* Router session must be locked */
static bool sescmd_reply_to_client(
DCB* client_dcb,
mysql_sescmd_t* scmd,
GWBUF* writebuf)
{
bool succp = false;
// rses_property_t* prop;
CHK_DCB(client_dcb); CHK_DCB(client_dcb);
CHK_MYSQL_SESCMD(scmd); CHK_MYSQL_SESCMD(scmd);
CHK_GWBUF(scmd->my_sescmd_buf); CHK_GWBUF(writebuf);
ss_dassert(SPINLOCK_IS_LOCKED(
&scmd->my_sescmd_prop->rses_prop_rsession->rses_lock));
prop = mysql_sescmd_get_property(scmd); // prop = mysql_sescmd_get_property(scmd);
rses_begin_locked_property_action(prop); // rses_begin_locked_property_action(prop);
if (!scmd->my_sescmd_is_replied) if (!scmd->my_sescmd_is_replied)
{ {
CHK_GWBUF(scmd->my_sescmd_buf); client_dcb->func.write(client_dcb, writebuf);
client_dcb->func.write(client_dcb, scmd->my_sescmd_buf);
scmd->my_sescmd_is_replied = true; scmd->my_sescmd_is_replied = true;
succp = true;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [sescmd_reply_to_client] Replied to client dcb %p.",
pthread_self(),
client_dcb)));
} }
rses_end_locked_property_action(prop); else
{
}
// rses_end_locked_property_action(prop);
return succp;
} }
/**
* Get the address of current session command.
*
* Router session must be locked */
static mysql_sescmd_t* sescmd_cursor_get_command( static mysql_sescmd_t* sescmd_cursor_get_command(
sescmd_cursor_t* scur) sescmd_cursor_t* scur)
{ {
mysql_sescmd_t* scmd = scur->scmd_cur_cmd; mysql_sescmd_t* scmd;
CHK_MYSQL_SESCMD(scmd);
ss_dassert(SPINLOCK_IS_LOCKED(&scur->scmd_cur_rses->rses_lock));
scur->scmd_cur_cmd = rses_property_get_sescmd(*scur->scmd_cur_ptr_property);
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
scmd = scur->scmd_cur_cmd;
return scmd; return scmd;
} }
@ -1398,6 +1561,8 @@ static sescmd_cursor_t* rses_get_sescmd_cursor(
backend_type_t be_type) backend_type_t be_type)
{ {
CHK_CLIENT_RSES(rses); CHK_CLIENT_RSES(rses);
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
return &rses->rses_cursor[be_type]; return &rses->rses_cursor[be_type];
} }
@ -1405,18 +1570,35 @@ static sescmd_cursor_t* rses_get_sescmd_cursor(
static bool sescmd_cursor_is_active( static bool sescmd_cursor_is_active(
sescmd_cursor_t* sescmd_cursor) sescmd_cursor_t* sescmd_cursor)
{ {
bool succp = sescmd_cursor->scmd_cur_active; bool succp;
ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock));
succp = sescmd_cursor->scmd_cur_active;
return succp; return succp;
} }
/** Router session must be locked */ /** router must be locked */
static GWBUF* sescmd_cursor_get_querybuf( static void sescmd_cursor_set_active(
sescmd_cursor_t* sescmd_cursor,
bool value)
{
ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock));
/** avoid calling unnecessarily */
ss_dassert(sescmd_cursor->scmd_cur_active != value);
sescmd_cursor->scmd_cur_active = value;
}
/**
* Clone session command's command buffer.
* Router session must be locked
*/
static GWBUF* sescmd_cursor_clone_querybuf(
sescmd_cursor_t* scur) sescmd_cursor_t* scur)
{ {
GWBUF* buf; GWBUF* buf;
ss_dassert(scur->scmd_cur_cmd != NULL); ss_dassert(scur->scmd_cur_cmd != NULL);
buf = scur->scmd_cur_cmd->my_sescmd_buf; buf = gwbuf_clone(scur->scmd_cur_cmd->my_sescmd_buf);
CHK_GWBUF(buf); CHK_GWBUF(buf);
return buf; return buf;
@ -1447,24 +1629,48 @@ static bool execute_sescmd_in_backend(
CHK_CLIENT_RSES(rses); CHK_CLIENT_RSES(rses);
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock)); ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
/**
* Get cursor pointer and copy of command buffer to cursor.
*/
scur = rses_get_sescmd_cursor(rses, be_type); scur = rses_get_sescmd_cursor(rses, be_type);
/** Return if there are no pending ses commands */ /** Return if there are no pending ses commands */
if (scur->scmd_cur_cmd == NULL) if (sescmd_cursor_get_command(scur) == NULL)
{ {
succp = false; succp = false;
goto return_succp;
} }
if (!sescmd_cursor_is_active(scur)) if (!sescmd_cursor_is_active(scur))
{ {
/** Cursor is left active when function returns. */
sescmd_cursor_set_active(scur, true); sescmd_cursor_set_active(scur, true);
rc = dcb->func.session(dcb, sescmd_cursor_get_querybuf(scur));
if (rc != 0) LOGIF(LT, tracelog_routed_query(rses,
"execute_sescmd_in_backend",
dcb,
sescmd_cursor_clone_querybuf(scur)));
rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur));
// rc = dcb->func.session(dcb, sescmd_cursor_clone_querybuf(scur));
if (rc != 1)
{ {
succp = false; succp = false;
} }
} }
else
{
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [routeQuery] Couldn't directly send SESSION "
"WRITER command to dcb %p because session command "
"cursor was executing previous command. Added "
"command to the queue.",
pthread_self(),
dcb)));
}
return_succp:
return succp; return succp;
} }
@ -1480,33 +1686,39 @@ static bool cont_exec_sescmd_in_backend(
ROUTER_CLIENT_SES* rses, ROUTER_CLIENT_SES* rses,
backend_type_t be_type) backend_type_t be_type)
{ {
bool succp = true;
DCB* dcb; DCB* dcb;
bool succp = true;
int rc = 0;
sescmd_cursor_t* scur; sescmd_cursor_t* scur;
int rc;
CHK_CLIENT_RSES(rses);
dcb = rses->rses_dcb[be_type]; dcb = rses->rses_dcb[be_type];
CHK_DCB(dcb);
CHK_DCB(dcb);
CHK_CLIENT_RSES(rses);
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock)); ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
scur = rses_get_sescmd_cursor(rses, be_type); scur = rses_get_sescmd_cursor(rses, be_type);
ss_dassert(sescmd_cursor_is_active(scur)); ss_dassert(sescmd_cursor_is_active(scur));
/** Return if there are no pending ses commands */
if (scur->scmd_cur_cmd == NULL) if (scur->scmd_cur_cmd == NULL)
{ {
succp = false; succp = false;
goto return_succp;
} }
rc = dcb->func.session(dcb, sescmd_cursor_get_querybuf(scur));
if (rc != 0) LOGIF(LT, tracelog_routed_query(rses,
"cont_exec_sescmd_in_backend",
dcb,
sescmd_cursor_clone_querybuf(scur)));
// rc = dcb->func.session(dcb, sescmd_cursor_clone_querybuf(scur));
rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur));
if (rc != 1)
{ {
succp = false; succp = false;
} }
return_succp:
return succp; return succp;
} }
@ -1523,45 +1735,58 @@ static bool sescmd_cursor_next(
rses_property_t* prop_curr; rses_property_t* prop_curr;
rses_property_t* prop_next; rses_property_t* prop_next;
ss_dassert(SPINLOCK_IS_LOCKED(&(*scur->scmd_cur_ptr_property)->rses_prop_lock)); ss_dassert(scur != NULL);
ss_dassert(*(scur->scmd_cur_ptr_property) != NULL);
ss_dassert(SPINLOCK_IS_LOCKED(
&(*(scur->scmd_cur_ptr_property))->rses_prop_rsession->rses_lock));
/** Illegal situation */
if (scur == NULL || if (scur == NULL ||
*(scur->scmd_cur_ptr_property) == NULL || *scur->scmd_cur_ptr_property == NULL ||
scur->scmd_cur_cmd == NULL) scur->scmd_cur_cmd == NULL)
{ {
/** Log error to debug */ /** Log error */
goto return_succp; goto return_succp;
} }
#if defined(SS_DEBUG)
prop_curr = *(scur->scmd_cur_ptr_property); prop_curr = *(scur->scmd_cur_ptr_property);
prop_next = prop_curr->rses_prop_next;
#endif
CHK_RSES_PROP(prop_curr);
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd); CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
CHK_RSES_PROP(scur->scmd_cur_cmd->my_sescmd_prop); ss_dassert(prop_curr == mysql_sescmd_get_property(scur->scmd_cur_cmd));
ss_dassert(scur->scmd_cur_cmd->my_sescmd_prop == prop_curr); CHK_RSES_PROP(prop_curr);
/** Copy address of pointer to next property */
scur->scmd_cur_ptr_property = &(prop_curr->rses_prop_next);
prop_next = *scur->scmd_cur_ptr_property;
ss_dassert(prop_next == *(scur->scmd_cur_ptr_property));
/** If there is a next property move forward */ /** If there is a next property move forward */
if ((*scur->scmd_cur_ptr_property)->rses_prop_next != NULL) if (prop_next != NULL)
{ {
scur->scmd_cur_ptr_property = CHK_RSES_PROP(prop_next);
&((*scur->scmd_cur_ptr_property)->rses_prop_next); CHK_RSES_PROP((*(scur->scmd_cur_ptr_property)));
scur->scmd_cur_cmd =
&(*scur->scmd_cur_ptr_property)->rses_prop_data.sescmd; /** Get pointer to next property's sescmd */
scur->scmd_cur_cmd = rses_property_get_sescmd(prop_next);
ss_dassert(prop_next == scur->scmd_cur_cmd->my_sescmd_prop);
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
CHK_RSES_PROP(scur->scmd_cur_cmd->my_sescmd_prop);
} }
else else
{ {
/** No more properties, can't proceed. */ /** No more properties, can't proceed. */
goto return_succp; goto return_succp;
} }
CHK_RSES_PROP((*(scur->scmd_cur_ptr_property)));
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
ss_dassert(prop_next == *(scur->scmd_cur_ptr_property));
if (scur->scmd_cur_cmd != NULL) if (scur->scmd_cur_cmd != NULL)
{ {
succp = true; succp = true;
} }
else
{
/** Log error, sescmd shouldn't be NULL */
}
return_succp: return_succp:
return succp; return succp;
} }
@ -1572,3 +1797,63 @@ static rses_property_t* mysql_sescmd_get_property(
CHK_MYSQL_SESCMD(scmd); CHK_MYSQL_SESCMD(scmd);
return scmd->my_sescmd_prop; return scmd->my_sescmd_prop;
} }
static void tracelog_routed_query(
ROUTER_CLIENT_SES* rses,
char* funcname,
DCB* dcb,
GWBUF* buf)
{
unsigned char* packet = GWBUF_DATA(buf);
unsigned char packet_type = packet[4];
size_t len;
size_t buflen = GWBUF_LENGTH(buf);
char* querystr;
char* startpos = (char *)&packet[5];
backend_type_t be_type;
len = packet[0];
len += 255*packet[1];
len += 255*255*packet[2];
if (rses->rses_dcb[BE_MASTER] == dcb)
{
be_type = BE_MASTER;
}
else if (rses->rses_dcb[BE_SLAVE] == dcb)
{
be_type = BE_SLAVE;
}
else
{
be_type = BE_UNDEFINED;
}
if (packet_type == '\x03')
{
querystr = (char *)malloc(len);
memcpy(querystr, startpos, len-1);
querystr[len-1] = '\0';
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [%s] %d bytes long buf, \"%s\" -> %s:%d %s dcb %p",
pthread_self(),
funcname,
buflen,
querystr,
(be_type == BE_MASTER ?
rses->rses_backend[BE_MASTER]->backend_server->name :
(be_type == BE_SLAVE ?
rses->rses_backend[BE_SLAVE]->backend_server->name :
"Target DCB is neither of the backends. This is error")),
(be_type == BE_MASTER ?
rses->rses_backend[BE_MASTER]->backend_server->port :
(be_type == BE_SLAVE ?
rses->rses_backend[BE_SLAVE]->backend_server->port :
-1)),
STRBETYPE(be_type),
dcb)));
}
gwbuf_free(buf);
}

View File

@ -216,6 +216,10 @@ typedef enum skygw_chk_t {
((r) == DCB_ROLE_REQUEST_HANDLER ? "DCB_ROLE_REQUEST_HANDLER" : \ ((r) == DCB_ROLE_REQUEST_HANDLER ? "DCB_ROLE_REQUEST_HANDLER" : \
"UNKNOWN DCB ROLE")) "UNKNOWN DCB ROLE"))
#define STRBETYPE(t) ((t) == BE_MASTER ? "BE_MASTER" : \
((t) == BE_SLAVE ? "BE_SLAVE" : \
((t) == BE_UNDEFINED ? "BE_UNDEFINED" : \
"Unknown backend tpe")))
#define CHK_MLIST(l) { \ #define CHK_MLIST(l) { \
ss_info_dassert((l->mlist_chk_top == CHK_NUM_MLIST && \ ss_info_dassert((l->mlist_chk_top == CHK_NUM_MLIST && \