Merge pull request #47 from skysql/blr
Blr, Fix to but, #478, http://bugs.skysql.com/show_bug.cgi?id=478
This commit is contained in:
@ -1646,6 +1646,7 @@ void
|
|||||||
shutdown_server()
|
shutdown_server()
|
||||||
{
|
{
|
||||||
poll_shutdown();
|
poll_shutdown();
|
||||||
|
hkshutdown();
|
||||||
log_flush_shutdown();
|
log_flush_shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -42,6 +42,8 @@ static HKTASK *tasks = NULL;
|
|||||||
*/
|
*/
|
||||||
static SPINLOCK tasklock = SPINLOCK_INIT;
|
static SPINLOCK tasklock = SPINLOCK_INIT;
|
||||||
|
|
||||||
|
static int do_shutdown = 0;
|
||||||
|
|
||||||
static void hkthread(void *);
|
static void hkthread(void *);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -172,6 +174,8 @@ void *taskdata;
|
|||||||
|
|
||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
|
if (do_shutdown)
|
||||||
|
return;
|
||||||
thread_millisleep(1000);
|
thread_millisleep(1000);
|
||||||
now = time(0);
|
now = time(0);
|
||||||
spinlock_acquire(&tasklock);
|
spinlock_acquire(&tasklock);
|
||||||
@ -194,3 +198,13 @@ void *taskdata;
|
|||||||
spinlock_release(&tasklock);
|
spinlock_release(&tasklock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called to shutdown the housekeeper
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
hkshutdown()
|
||||||
|
{
|
||||||
|
do_shutdown = 1;
|
||||||
|
}
|
||||||
|
|||||||
@ -391,7 +391,7 @@ DCB *zombies = NULL;
|
|||||||
while (1)
|
while (1)
|
||||||
{
|
{
|
||||||
/* Process of the queue of waiting requests */
|
/* Process of the queue of waiting requests */
|
||||||
while (process_pollq(thread_id))
|
while (do_shutdown == 0 && process_pollq(thread_id))
|
||||||
{
|
{
|
||||||
if (thread_data)
|
if (thread_data)
|
||||||
thread_data[thread_id].state = THREAD_ZPROCESSING;
|
thread_data[thread_id].state = THREAD_ZPROCESSING;
|
||||||
@ -885,6 +885,20 @@ poll_bitmask()
|
|||||||
return &poll_mask;
|
return &poll_mask;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Display an entry from the spinlock statistics data
|
||||||
|
*
|
||||||
|
* @param dcb The DCB to print to
|
||||||
|
* @param desc Description of the statistic
|
||||||
|
* @param value The statistic value
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
spin_reporter(void *dcb, char *desc, int value)
|
||||||
|
{
|
||||||
|
dcb_printf((DCB *)dcb, "\t%-40s %d\n", desc, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Debug routine to print the polling statistics
|
* Debug routine to print the polling statistics
|
||||||
*
|
*
|
||||||
@ -922,6 +936,11 @@ int i;
|
|||||||
}
|
}
|
||||||
dcb_printf(dcb, "\t>= %d\t\t\t%d\n", MAXNFDS,
|
dcb_printf(dcb, "\t>= %d\t\t\t%d\n", MAXNFDS,
|
||||||
pollStats.n_fds[MAXNFDS-1]);
|
pollStats.n_fds[MAXNFDS-1]);
|
||||||
|
|
||||||
|
#if SPINLOCK_PROFILE
|
||||||
|
dcb_printf(dcb, "Event queue lock statistics:\n");
|
||||||
|
spinlock_stats(&pollqlock, spin_reporter, dcb);
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@ -47,4 +47,5 @@ typedef struct hktask {
|
|||||||
extern void hkinit();
|
extern void hkinit();
|
||||||
extern int hktask_add(char *name, void (*task)(void *), void *data, int frequency);
|
extern int hktask_add(char *name, void (*task)(void *), void *data, int frequency);
|
||||||
extern int hktask_remove(char *name);
|
extern int hktask_remove(char *name);
|
||||||
|
extern void hkshutdown();
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@ -31,7 +31,7 @@
|
|||||||
#include <thread.h>
|
#include <thread.h>
|
||||||
#include <stdbool.h>
|
#include <stdbool.h>
|
||||||
|
|
||||||
#define SPINLOCK_PROFILE 0
|
#define SPINLOCK_PROFILE 1
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The spinlock structure.
|
* The spinlock structure.
|
||||||
|
|||||||
@ -43,8 +43,8 @@
|
|||||||
* High and Low water marks for the slave dcb. These values can be overriden
|
* High and Low water marks for the slave dcb. These values can be overriden
|
||||||
* by the router options highwater and lowwater.
|
* by the router options highwater and lowwater.
|
||||||
*/
|
*/
|
||||||
#define DEF_LOW_WATER 20000
|
#define DEF_LOW_WATER 2000
|
||||||
#define DEF_HIGH_WATER 300000
|
#define DEF_HIGH_WATER 30000
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Some useful macros for examining the MySQL Response packets
|
* Some useful macros for examining the MySQL Response packets
|
||||||
|
|||||||
@ -224,6 +224,7 @@ typedef struct backend_ref_st {
|
|||||||
bref_state_t bref_state;
|
bref_state_t bref_state;
|
||||||
int bref_num_result_wait;
|
int bref_num_result_wait;
|
||||||
sescmd_cursor_t bref_sescmd_cur;
|
sescmd_cursor_t bref_sescmd_cur;
|
||||||
|
GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */
|
||||||
#if defined(SS_DEBUG)
|
#if defined(SS_DEBUG)
|
||||||
skygw_chk_t bref_chk_tail;
|
skygw_chk_t bref_chk_tail;
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@ -1642,6 +1642,8 @@ void protocol_archive_srv_command(
|
|||||||
server_command_t* h1;
|
server_command_t* h1;
|
||||||
int len = 0;
|
int len = 0;
|
||||||
|
|
||||||
|
CHK_PROTOCOL(p);
|
||||||
|
|
||||||
spinlock_acquire(&p->protocol_lock);
|
spinlock_acquire(&p->protocol_lock);
|
||||||
|
|
||||||
if (p->protocol_state != MYSQL_PROTOCOL_ACTIVE)
|
if (p->protocol_state != MYSQL_PROTOCOL_ACTIVE)
|
||||||
@ -1692,6 +1694,7 @@ void protocol_archive_srv_command(
|
|||||||
|
|
||||||
retblock:
|
retblock:
|
||||||
spinlock_release(&p->protocol_lock);
|
spinlock_release(&p->protocol_lock);
|
||||||
|
CHK_PROTOCOL(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -47,11 +47,12 @@ CLIOBJ=$(CLISRCS:.c=.o)
|
|||||||
SRCS=$(TESTSRCS) $(READCONSRCS) $(DEBUGCLISRCS) cli.c
|
SRCS=$(TESTSRCS) $(READCONSRCS) $(DEBUGCLISRCS) cli.c
|
||||||
OBJ=$(SRCS:.c=.o)
|
OBJ=$(SRCS:.c=.o)
|
||||||
LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager
|
LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager
|
||||||
MODULES= libdebugcli.so libreadconnroute.so libtestroute.so libcli.so
|
MODULES= libdebugcli.so libreadconnroute.so libtestroute.so libcli.so libbinlogrouter.so
|
||||||
|
|
||||||
|
|
||||||
all: $(MODULES)
|
all: $(MODULES)
|
||||||
(cd readwritesplit; make)
|
(cd readwritesplit; make)
|
||||||
|
(cd binlog; make)
|
||||||
|
|
||||||
libtestroute.so: $(TESTOBJ)
|
libtestroute.so: $(TESTOBJ)
|
||||||
$(CC) $(LDFLAGS) $(TESTOBJ) $(LIBS) -o $@
|
$(CC) $(LDFLAGS) $(TESTOBJ) $(LIBS) -o $@
|
||||||
@ -68,12 +69,16 @@ libcli.so: $(CLIOBJ)
|
|||||||
libreadwritesplit.so:
|
libreadwritesplit.so:
|
||||||
(cd readwritesplit; touch depend.mk ; make; cp $@ ..)
|
(cd readwritesplit; touch depend.mk ; make; cp $@ ..)
|
||||||
|
|
||||||
|
libbinlogrouter.so:
|
||||||
|
(cd binlog; touch depend.mk ; make; cp $@ ..)
|
||||||
|
|
||||||
.c.o:
|
.c.o:
|
||||||
$(CC) $(CFLAGS) $< -o $@
|
$(CC) $(CFLAGS) $< -o $@
|
||||||
|
|
||||||
clean:
|
clean:
|
||||||
$(DEL) $(OBJ) $(MODULES)
|
$(DEL) $(OBJ) $(MODULES)
|
||||||
(cd readwritesplit; touch depend.mk; make clean)
|
(cd readwritesplit; touch depend.mk; make clean)
|
||||||
|
(cd binlog; touch depend.mk; make clean)
|
||||||
|
|
||||||
tags:
|
tags:
|
||||||
ctags $(SRCS) $(HDRS)
|
ctags $(SRCS) $(HDRS)
|
||||||
@ -83,6 +88,7 @@ depend:
|
|||||||
@$(DEL) depend.mk
|
@$(DEL) depend.mk
|
||||||
cc -M $(CFLAGS) $(SRCS) > depend.mk
|
cc -M $(CFLAGS) $(SRCS) > depend.mk
|
||||||
(cd readwritesplit; touch depend.mk ; make depend)
|
(cd readwritesplit; touch depend.mk ; make depend)
|
||||||
|
(cd binlog; touch depend.mk ; make depend)
|
||||||
|
|
||||||
install: $(MODULES)
|
install: $(MODULES)
|
||||||
install -D $(MODULES) $(DEST)/modules
|
install -D $(MODULES) $(DEST)/modules
|
||||||
|
|||||||
@ -55,21 +55,57 @@
|
|||||||
#include <skygw_utils.h>
|
#include <skygw_utils.h>
|
||||||
#include <log_manager.h>
|
#include <log_manager.h>
|
||||||
|
|
||||||
|
#include <rdtsc.h>
|
||||||
|
|
||||||
/* Temporary requirement for auth data */
|
/* Temporary requirement for auth data */
|
||||||
#include <mysql_client_server_protocol.h>
|
#include <mysql_client_server_protocol.h>
|
||||||
|
|
||||||
|
#define SAMPLE_COUNT 10000
|
||||||
|
CYCLES samples[10][SAMPLE_COUNT];
|
||||||
|
int sample_index[10] = { 0, 0, 0 };
|
||||||
|
|
||||||
|
#define LOGD_SLAVE_CATCHUP1 0
|
||||||
|
#define LOGD_SLAVE_CATCHUP2 1
|
||||||
|
#define LOGD_DISTRIBUTE 2
|
||||||
|
#define LOGD_FILE_FLUSH 3
|
||||||
|
|
||||||
|
SPINLOCK logspin = SPINLOCK_INIT;
|
||||||
|
|
||||||
|
void
|
||||||
|
log_duration(int sample, CYCLES duration)
|
||||||
|
{
|
||||||
|
char fname[100];
|
||||||
|
int i;
|
||||||
|
FILE *fp;
|
||||||
|
|
||||||
|
spinlock_acquire(&logspin);
|
||||||
|
samples[sample][sample_index[sample]++] = duration;
|
||||||
|
if (sample_index[sample] == SAMPLE_COUNT)
|
||||||
|
{
|
||||||
|
sprintf(fname, "binlog_profile.%d", sample);
|
||||||
|
if ((fp = fopen(fname, "a")) != NULL)
|
||||||
|
{
|
||||||
|
for (i = 0; i < SAMPLE_COUNT; i++)
|
||||||
|
fprintf(fp, "%ld\n", samples[sample][i]);
|
||||||
|
fclose(fp);
|
||||||
|
}
|
||||||
|
sample_index[sample] = 0;
|
||||||
|
}
|
||||||
|
spinlock_release(&logspin);
|
||||||
|
}
|
||||||
|
|
||||||
extern int lm_enabled_logfiles_bitmask;
|
extern int lm_enabled_logfiles_bitmask;
|
||||||
|
|
||||||
static GWBUF *blr_make_query(char *statement);
|
static GWBUF *blr_make_query(char *statement);
|
||||||
static GWBUF *blr_make_registration(ROUTER_INSTANCE *router);
|
static GWBUF *blr_make_registration(ROUTER_INSTANCE *router);
|
||||||
static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router);
|
static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router);
|
||||||
static void encode_value(unsigned char *data, unsigned int value, int len);
|
void encode_value(unsigned char *data, unsigned int value, int len);
|
||||||
static void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
|
void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
|
||||||
static void blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr);
|
static void blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr);
|
||||||
static void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
|
void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
|
||||||
static void *CreateMySQLAuthData(char *username, char *password, char *database);
|
static void *CreateMySQLAuthData(char *username, char *password, char *database);
|
||||||
static void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
|
void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
|
||||||
static uint32_t extract_field(uint8_t *src, int bits);
|
inline uint32_t extract_field(uint8_t *src, int bits);
|
||||||
static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len);
|
static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len);
|
||||||
|
|
||||||
static int keepalive = 1;
|
static int keepalive = 1;
|
||||||
@ -460,7 +496,7 @@ int len = 0x1b;
|
|||||||
* @param value The value to pack
|
* @param value The value to pack
|
||||||
* @param len Number of bits to encode value into
|
* @param len Number of bits to encode value into
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
encode_value(unsigned char *data, unsigned int value, int len)
|
encode_value(unsigned char *data, unsigned int value, int len)
|
||||||
{
|
{
|
||||||
while (len > 0)
|
while (len > 0)
|
||||||
@ -478,7 +514,7 @@ encode_value(unsigned char *data, unsigned int value, int len)
|
|||||||
* @param router The router instance
|
* @param router The router instance
|
||||||
* @param pkt The binlog records
|
* @param pkt The binlog records
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||||
{
|
{
|
||||||
uint8_t *msg = NULL, *ptr, *pdata;
|
uint8_t *msg = NULL, *ptr, *pdata;
|
||||||
@ -766,7 +802,9 @@ static REP_HEADER phdr;
|
|||||||
{
|
{
|
||||||
ss_dassert(pkt_length == 0);
|
ss_dassert(pkt_length == 0);
|
||||||
}
|
}
|
||||||
|
{ CYCLES start = rdtsc();
|
||||||
blr_file_flush(router);
|
blr_file_flush(router);
|
||||||
|
log_duration(LOGD_FILE_FLUSH, rdtsc() - start); }
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -775,7 +813,7 @@ static REP_HEADER phdr;
|
|||||||
* @param pkt The incoming packet in a GWBUF chain
|
* @param pkt The incoming packet in a GWBUF chain
|
||||||
* @param hdr The packet header to populate
|
* @param hdr The packet header to populate
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
blr_extract_header(uint8_t *ptr, REP_HEADER *hdr)
|
blr_extract_header(uint8_t *ptr, REP_HEADER *hdr)
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -796,10 +834,10 @@ blr_extract_header(uint8_t *ptr, REP_HEADER *hdr)
|
|||||||
* @param src The raw packet source
|
* @param src The raw packet source
|
||||||
* @param birs The number of bits to extract (multiple of 8)
|
* @param birs The number of bits to extract (multiple of 8)
|
||||||
*/
|
*/
|
||||||
static uint32_t
|
inline uint32_t
|
||||||
extract_field(uint8_t *src, int bits)
|
extract_field(register uint8_t *src, int bits)
|
||||||
{
|
{
|
||||||
uint32_t rval = 0, shift = 0;
|
register uint32_t rval = 0, shift = 0;
|
||||||
|
|
||||||
while (bits > 0)
|
while (bits > 0)
|
||||||
{
|
{
|
||||||
@ -884,14 +922,16 @@ MYSQL_session *auth_info;
|
|||||||
* @param hdr The replication event header
|
* @param hdr The replication event header
|
||||||
* @param ptr The raw replication event data
|
* @param ptr The raw replication event data
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||||
{
|
{
|
||||||
GWBUF *pkt;
|
GWBUF *pkt;
|
||||||
uint8_t *buf;
|
uint8_t *buf;
|
||||||
ROUTER_SLAVE *slave;
|
ROUTER_SLAVE *slave;
|
||||||
int action;
|
int action;
|
||||||
|
CYCLES entry;
|
||||||
|
|
||||||
|
entry = rdtsc();
|
||||||
spinlock_acquire(&router->lock);
|
spinlock_acquire(&router->lock);
|
||||||
slave = router->slaves;
|
slave = router->slaves;
|
||||||
while (slave)
|
while (slave)
|
||||||
@ -945,12 +985,16 @@ int action;
|
|||||||
spinlock_acquire(&slave->catch_lock);
|
spinlock_acquire(&slave->catch_lock);
|
||||||
if (slave->overrun)
|
if (slave->overrun)
|
||||||
{
|
{
|
||||||
|
CYCLES cycle_start, cycles;
|
||||||
slave->stats.n_overrun++;
|
slave->stats.n_overrun++;
|
||||||
slave->overrun = 0;
|
slave->overrun = 0;
|
||||||
spinlock_release(&router->lock);
|
spinlock_release(&router->lock);
|
||||||
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
|
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
|
||||||
spinlock_release(&slave->catch_lock);
|
spinlock_release(&slave->catch_lock);
|
||||||
|
cycle_start = rdtsc();
|
||||||
blr_slave_catchup(router, slave);
|
blr_slave_catchup(router, slave);
|
||||||
|
cycles = rdtsc() - cycle_start;
|
||||||
|
log_duration(LOGD_SLAVE_CATCHUP2, cycles);
|
||||||
spinlock_acquire(&router->lock);
|
spinlock_acquire(&router->lock);
|
||||||
slave = router->slaves;
|
slave = router->slaves;
|
||||||
if (slave)
|
if (slave)
|
||||||
@ -983,6 +1027,7 @@ int action;
|
|||||||
*/
|
*/
|
||||||
if (slave->cstate & CS_UPTODATE)
|
if (slave->cstate & CS_UPTODATE)
|
||||||
{
|
{
|
||||||
|
CYCLES cycle_start, cycles;
|
||||||
spinlock_release(&router->lock);
|
spinlock_release(&router->lock);
|
||||||
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
|
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
|
||||||
"Force slave %d into catchup mode %s@%d\n",
|
"Force slave %d into catchup mode %s@%d\n",
|
||||||
@ -991,7 +1036,10 @@ int action;
|
|||||||
spinlock_acquire(&slave->catch_lock);
|
spinlock_acquire(&slave->catch_lock);
|
||||||
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
|
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
|
||||||
spinlock_release(&slave->catch_lock);
|
spinlock_release(&slave->catch_lock);
|
||||||
|
cycle_start = rdtsc();
|
||||||
blr_slave_catchup(router, slave);
|
blr_slave_catchup(router, slave);
|
||||||
|
cycles = rdtsc() - cycle_start;
|
||||||
|
log_duration(LOGD_SLAVE_CATCHUP1, cycles);
|
||||||
spinlock_acquire(&router->lock);
|
spinlock_acquire(&router->lock);
|
||||||
slave = router->slaves;
|
slave = router->slaves;
|
||||||
if (slave)
|
if (slave)
|
||||||
@ -1005,6 +1053,7 @@ int action;
|
|||||||
slave = slave->next;
|
slave = slave->next;
|
||||||
}
|
}
|
||||||
spinlock_release(&router->lock);
|
spinlock_release(&router->lock);
|
||||||
|
log_duration(LOGD_DISTRIBUTE, rdtsc() - entry);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
|||||||
@ -1957,6 +1957,26 @@ static int routeQuery(
|
|||||||
|
|
||||||
if (succp) /*< Have DCB of the target backend */
|
if (succp) /*< Have DCB of the target backend */
|
||||||
{
|
{
|
||||||
|
backend_ref_t* bref;
|
||||||
|
sescmd_cursor_t* scur;
|
||||||
|
|
||||||
|
bref = get_bref_from_dcb(router_cli_ses, target_dcb);
|
||||||
|
scur = &bref->bref_sescmd_cur;
|
||||||
|
/**
|
||||||
|
* Store current stmt if execution of previous session command
|
||||||
|
* haven't completed yet. Note that according to MySQL protocol
|
||||||
|
* there can only be one such non-sescmd stmt at the time.
|
||||||
|
*/
|
||||||
|
if (sescmd_cursor_is_active(scur))
|
||||||
|
{
|
||||||
|
ss_dassert(bref->bref_pending_cmd == NULL);
|
||||||
|
bref->bref_pending_cmd = gwbuf_clone(querybuf);
|
||||||
|
|
||||||
|
rses_end_locked_router_action(router_cli_ses);
|
||||||
|
ret = 1;
|
||||||
|
goto retblock;
|
||||||
|
}
|
||||||
|
|
||||||
if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1)
|
if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1)
|
||||||
{
|
{
|
||||||
backend_ref_t* bref;
|
backend_ref_t* bref;
|
||||||
@ -2295,6 +2315,33 @@ static void clientReply (
|
|||||||
|
|
||||||
ss_dassert(succp);
|
ss_dassert(succp);
|
||||||
}
|
}
|
||||||
|
else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */
|
||||||
|
{
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
CHK_GWBUF(bref->bref_pending_cmd);
|
||||||
|
|
||||||
|
if ((ret = bref->bref_dcb->func.write(bref->bref_dcb,
|
||||||
|
gwbuf_clone(bref->bref_pending_cmd))) == 1)
|
||||||
|
{
|
||||||
|
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
||||||
|
atomic_add(&inst->stats.n_queries, 1);
|
||||||
|
/**
|
||||||
|
* Add one query response waiter to backend reference
|
||||||
|
*/
|
||||||
|
bref_set_state(bref, BREF_QUERY_ACTIVE);
|
||||||
|
bref_set_state(bref, BREF_WAITING_RESULT);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error : Routing query \"%s\" failed.",
|
||||||
|
bref->bref_pending_cmd)));
|
||||||
|
}
|
||||||
|
gwbuf_free(bref->bref_pending_cmd);
|
||||||
|
bref->bref_pending_cmd = NULL;
|
||||||
|
}
|
||||||
/** Unlock router session */
|
/** Unlock router session */
|
||||||
rses_end_locked_router_action(router_cli_ses);
|
rses_end_locked_router_action(router_cli_ses);
|
||||||
|
|
||||||
@ -3656,6 +3703,14 @@ static bool route_session_write(
|
|||||||
{
|
{
|
||||||
succp = false;
|
succp = false;
|
||||||
}
|
}
|
||||||
|
else if (LOG_IS_ENABLED(LOGFILE_TRACE))
|
||||||
|
{
|
||||||
|
LOGIF(LT, (skygw_log_write(
|
||||||
|
LOGFILE_TRACE,
|
||||||
|
"Wrote to %s:%d",
|
||||||
|
backend_ref[i].bref_backend->backend_server->name,
|
||||||
|
backend_ref[i].bref_backend->backend_server->port)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rses_end_locked_router_action(router_cli_ses);
|
rses_end_locked_router_action(router_cli_ses);
|
||||||
|
|||||||
Reference in New Issue
Block a user