Merge branch 'develop' into schemarouter_refresh

Conflicts:
	Documentation/Tutorials/Replication-Proxy-Binlog-Router-Tutorial.md
This commit is contained in:
Markus Makela
2015-08-08 11:25:54 +03:00
72 changed files with 2637 additions and 2056 deletions

View File

@ -108,6 +108,9 @@ static void stats_func(void *);
static bool rses_begin_locked_router_action(ROUTER_SLAVE *);
static void rses_end_locked_router_action(ROUTER_SLAVE *);
void my_uuid_init(ulong seed1, ulong seed2);
void my_uuid(char *guid);
GWBUF *blr_cache_read_response(ROUTER_INSTANCE *router, char *response);
static SPINLOCK instlock;
static ROUTER_INSTANCE *instances;
@ -170,9 +173,45 @@ createInstance(SERVICE *service, char **options)
ROUTER_INSTANCE *inst;
char *value, *name;
int i;
unsigned char *defuuid;
char *defuuid;
if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
if(service->credentials.name == NULL ||
service->credentials.authdata == NULL)
{
skygw_log_write(LE,"%s: Error: Service is missing user credentials."
" Add the missing username or passwd parameter to the service.",
service->name);
return NULL;
}
if(options == NULL || options[0] == NULL)
{
skygw_log_write(LE,
"%s: Error: No router options supplied for binlogrouter",
service->name);
return NULL;
}
/*
* We only support one server behind this router, since the server is
* the master from which we replicate binlog records. Therefore check
* that only one server has been defined.
*
* A later improvement will be to define multiple servers and have the
* router use the information that is supplied by the monitor to find
* which of these servers is currently the master and replicate from
* that server.
*/
if (service->dbref == NULL || service->dbref->next != NULL)
{
skygw_log_write(LE,
"%s: Error : Exactly one database server may be "
"for use with the binlog router.",
service->name);
return NULL;
}
if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
return NULL;
}
@ -215,25 +254,6 @@ unsigned char *defuuid;
defuuid[12], defuuid[13], defuuid[14], defuuid[15]);
}
/*
* We only support one server behind this router, since the server is
* the master from which we replicate binlog records. Therefore check
* that only one server has been defined.
*
* A later improvement will be to define multiple servers and have the
* router use the information that is supplied by the monitor to find
* which of these servers is currently the master and replicate from
* that server.
*/
if (service->dbref == NULL || service->dbref->next != NULL)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Error : Exactly one database server may be "
"for use with the binlog router.")));
}
/*
* Process the options.
* We have an array of attrbute values passed to us that we must
@ -364,7 +384,7 @@ unsigned char *defuuid;
else
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "%s: No router options supplied for binlogrouter",
LOGFILE_ERROR, "%s: Error: No router options supplied for binlogrouter",
service->name)));
}
@ -598,7 +618,7 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
LOGFILE_MESSAGE,
"%s: Master %s disconnected after %ld seconds. "
"%d events read,",
router->service->name, router->master->remote,
router->service->name, router->service->dbref->server->name,
time(0) - router->connect_time, router->stats.n_binlogs_ses)));
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
@ -832,7 +852,7 @@ struct tm tm;
if (router_inst->lastEventTimestamp)
{
localtime_r(&router_inst->lastEventTimestamp, &tm);
localtime_r((const time_t*)&router_inst->lastEventTimestamp, &tm);
asctime_r(&tm, buf);
dcb_printf(dcb, "\tLast binlog event timestamp: %ld (%s)\n",
router_inst->lastEventTimestamp, buf);
@ -960,7 +980,7 @@ struct tm tm;
if (session->lastEventTimestamp
&& router_inst->lastEventTimestamp)
{
localtime_r(&session->lastEventTimestamp, &tm);
localtime_r((const time_t*)&session->lastEventTimestamp, &tm);
asctime_r(&tm, buf);
dcb_printf(dcb, "\t\tLast binlog event timestamp %u, %s", session->lastEventTimestamp, buf);
dcb_printf(dcb, "\t\tSeconds behind master %u\n", router_inst->lastEventTimestamp - session->lastEventTimestamp);
@ -1058,7 +1078,8 @@ static void
errorReply(ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_dcb, error_action_t action, bool *succp)
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
int error, len;
int error;
socklen_t len;
char msg[85], *errmsg;
if (action == ERRACT_RESET)
@ -1101,7 +1122,7 @@ char msg[85], *errmsg;
LOGFILE_MESSAGE,
"%s: Master %s disconnected after %ld seconds. "
"%d events read.",
router->service->name, router->master->remote,
router->service->name, router->service->dbref->server->name,
time(0) - router->connect_time, router->stats.n_binlogs_ses)));
blr_master_reconnect(router);
}
@ -1211,10 +1232,10 @@ int len;
snprintf(result, 1000,
"Uptime: %u Threads: %u Events: %u Slaves: %u Master State: %s",
time(0) - router->connect_time,
config_threadcount(),
router->stats.n_binlogs_ses,
router->stats.n_slaves,
(unsigned int)(time(0) - router->connect_time),
(unsigned int)config_threadcount(),
(unsigned int)router->stats.n_binlogs_ses,
(unsigned int)router->stats.n_slaves,
blrm_states[router->master_state]);
if ((ret = gwbuf_alloc(4 + strlen(result))) == NULL)
return 0;
@ -1339,3 +1360,24 @@ GWBUF *errbuf = NULL;
return dcb->func.write(dcb, errbuf);
}
/**
* Extract a numeric field from a packet of the specified number of bits
*
* @param src The raw packet source
* @param birs The number of bits to extract (multiple of 8)
*/
uint32_t
extract_field(uint8_t *src, int bits)
{
uint32_t rval = 0, shift = 0;
while (bits > 0)
{
rval |= (*src++) << shift;
shift += 8;
bits -= 8;
}
return rval;
}

View File

@ -46,7 +46,7 @@
#include <blr.h>
#include <dcb.h>
#include <spinlock.h>
#include <gwdirs.h>
#include <skygw_types.h>
#include <skygw_utils.h>
#include <log_manager.h>
@ -58,7 +58,6 @@ extern __thread log_info_t tls_log_info;
static int blr_file_create(ROUTER_INSTANCE *router, char *file);
static void blr_file_append(ROUTER_INSTANCE *router, char *file);
static uint32_t extract_field(uint8_t *src, int bits);
static void blr_log_header(logfile_id_t file, char *msg, uint8_t *ptr);
/**
@ -599,26 +598,6 @@ blr_close_binlog(ROUTER_INSTANCE *router, BLFILE *file)
free(file);
}
/**
* Extract a numeric field from a packet of the specified number of bits
*
* @param src The raw packet source
* @param birs The number of bits to extract (multiple of 8)
*/
static uint32_t
extract_field(uint8_t *src, int bits)
{
uint32_t rval = 0, shift = 0;
while (bits > 0)
{
rval |= (*src++) << shift;
shift += 8;
bits -= 8;
}
return rval;
}
/**
* Log the event header of binlog event
*
@ -671,7 +650,7 @@ blr_cache_response(ROUTER_INSTANCE *router, char *response, GWBUF *buf)
char path[PATH_MAX+1], *ptr;
int fd;
strcpy(path,get_datadir());
strncpy(path,get_datadir(),PATH_MAX);
strncat(path,"/",PATH_MAX);
strncat(path, router->service->name, PATH_MAX);
@ -706,7 +685,7 @@ char path[PATH_MAX+1], *ptr;
int fd;
GWBUF *buf;
strcpy(path, get_datadir());
strncpy(path, get_datadir(),PATH_MAX);
strncat(path, "/", PATH_MAX);
strncat(path, router->service->name, PATH_MAX);
strncat(path, "/.cache/", PATH_MAX);

View File

@ -77,11 +77,11 @@ static int blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *
void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
static void *CreateMySQLAuthData(char *username, char *password, char *database);
void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
inline uint32_t extract_field(uint8_t *src, int bits);
static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len);
static void blr_master_close(ROUTER_INSTANCE *);
static char *blr_extract_column(GWBUF *buf, int col);
void blr_cache_response(ROUTER_INSTANCE *router, char *response, GWBUF *buf);
void poll_fake_write_event(DCB *dcb);
static int keepalive = 1;
/**
@ -92,8 +92,9 @@ static int keepalive = 1;
* @param router The router instance
*/
void
blr_start_master(ROUTER_INSTANCE *router)
blr_start_master(void* data)
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE*)data;
DCB *client;
GWBUF *buf;
@ -156,7 +157,7 @@ GWBUF *buf;
LOGIF(LM,(skygw_log_write(
LOGFILE_MESSAGE,
"%s: attempting to connect to master server %s.",
router->service->name, router->master->remote)));
router->service->name, router->service->dbref->server->name)));
router->connect_time = time(0);
if (setsockopt(router->master->fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive , sizeof(keepalive )))
@ -586,7 +587,7 @@ char query[128];
"%s: Request binlog records from %s at "
"position %d from master server %s.",
router->service->name, router->binlog_name,
router->binlog_position, router->master->remote)));
router->binlog_position, router->service->dbref->server->name)));
break;
case BLRM_BINLOGDUMP:
// Main body, we have received a binlog record from the master
@ -728,7 +729,6 @@ int no_residual = 1;
int preslen = -1;
int prev_length = -1;
int n_bufs = -1, pn_bufs = -1;
static REP_HEADER phdr;
/*
* Prepend any residual buffer to the buffer chain we have
@ -914,7 +914,7 @@ static REP_HEADER phdr;
}
break;
}
phdr = hdr;
if (hdr.ok == 0)
{
int event_limit;
@ -1160,26 +1160,6 @@ blr_extract_header(register uint8_t *ptr, register REP_HEADER *hdr)
hdr->flags = EXTRACT16(&ptr[22]);
}
/**
* Extract a numeric field from a packet of the specified number of bits
*
* @param src The raw packet source
* @param bits The number of bits to extract (multiple of 8)
*/
inline uint32_t
extract_field(register uint8_t *src, int bits)
{
register uint32_t rval = 0, shift = 0;
while (bits > 0)
{
rval |= (*src++) << shift;
shift += 8;
bits -= 8;
}
return rval;
}
/**
* Process a binlog rotate event.
*
@ -1249,8 +1229,8 @@ MYSQL_session *auth_info;
if ((auth_info = calloc(1, sizeof(MYSQL_session))) == NULL)
return NULL;
strncpy(auth_info->user, username,MYSQL_USER_MAXLEN+1);
strncpy(auth_info->db, database,MYSQL_DATABASE_MAXLEN+1);
strncpy(auth_info->user, username,MYSQL_USER_MAXLEN);
strncpy(auth_info->db, database,MYSQL_DATABASE_MAXLEN);
gw_sha1_str((const uint8_t *)password, strlen(password), auth_info->client_sha1);
return auth_info;

View File

@ -60,7 +60,6 @@
#include <log_manager.h>
#include <version.h>
static uint32_t extract_field(uint8_t *src, int bits);
static void encode_value(unsigned char *data, unsigned int value, int len);
static int blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue);
static int blr_slave_replay(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *master);
@ -86,7 +85,7 @@ static int blr_slave_send_disconnected_server(ROUTER_INSTANCE *router, ROUTER_SL
static int blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static int blr_slave_disconnect_server(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int server_id);
static int blr_slave_send_ok(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave);
void poll_fake_write_event(DCB *dcb);
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
@ -810,7 +809,7 @@ int len, actual_len, col_len, seqno, ncols, i;
strncpy((char *)ptr, column, col_len); // Result string
ptr += col_len;
sprintf(column, "%s", router->master->remote ? router->master->remote : "");
sprintf(column, "%s", router->service->dbref->server->name ? router->service->dbref->server->name : "");
col_len = strlen(column);
*ptr++ = col_len; // Length of result string
strncpy((char *)ptr, column, col_len); // Result string
@ -1122,10 +1121,9 @@ blr_slave_register(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
{
GWBUF *resp;
uint8_t *ptr;
int len, slen;
int slen;
ptr = GWBUF_DATA(queue);
len = extract_field(ptr, 24);
ptr += 4; // Skip length and sequence number
if (*ptr++ != COM_REGISTER_SLAVE)
return 0;
@ -1194,7 +1192,7 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue
{
GWBUF *resp;
uint8_t *ptr;
int len, flags, serverid, rval, binlognamelen;
int len, rval, binlognamelen;
REP_HEADER hdr;
uint32_t chksum;
@ -1222,9 +1220,7 @@ uint32_t chksum;
slave->binlog_pos = extract_field(ptr, 32);
ptr += 4;
flags = extract_field(ptr, 16);
ptr += 2;
serverid = extract_field(ptr, 32);
ptr += 4;
strncpy(slave->binlogfile, (char *)ptr, binlognamelen);
slave->binlogfile[binlognamelen] = 0;
@ -1306,28 +1302,6 @@ uint32_t chksum;
return rval;
}
/**
* Extract a numeric field from a packet of the specified number of bits,
* the number of bits must be a multiple of 8.
*
* @param src The raw packet source
* @param bits The number of bits to extract (multiple of 8)
* @return The extracted value
*/
static uint32_t
extract_field(uint8_t *src, int bits)
{
uint32_t rval = 0, shift = 0;
while (bits > 0)
{
rval |= (*src++) << shift;
shift += 8;
bits -= 8;
}
return rval;
}
/**
* Encode a value into a number of bits in a MySQL packet
*
@ -1689,7 +1663,7 @@ int len = EXTRACT24(ptr + 9); // Extract the event length
len = BINLOG_FNAMELEN;
ptr += 19; // Skip header
slave->binlog_pos = extract_field(ptr, 32);
slave->binlog_pos += (extract_field(ptr+4, 32) << 32);
slave->binlog_pos += (((uint64_t)extract_field(ptr+4, 32)) << 32);
memcpy(slave->binlogfile, ptr + 8, len);
slave->binlogfile[len] = 0;
}
@ -2055,7 +2029,6 @@ blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
uint8_t *ptr;
int len, seqno;
GWBUF *pkt;
int n = 1;
/* preparing output result */
blr_slave_send_fieldcount(router, slave, 2);
@ -2105,7 +2078,7 @@ blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
strncpy((char *)ptr, state, strlen(state)); // Result string
ptr += strlen(state);
n = slave->dcb->func.write(slave->dcb, pkt);
slave->dcb->func.write(slave->dcb, pkt);
/* force session close*/
router_obj->closeSession(router->service->router_instance, sptr);

View File

@ -43,6 +43,7 @@
* 29/05/14 Mark Riddoch Add Filter support
* 16/10/14 Mark Riddoch Add show eventq
* 05/03/15 Massimiliano Pinto Added enable/disable feedback
* 27/05/15 Martin Brampton Add show persistent [server]
*
* @endverbatim
*/
@ -154,6 +155,10 @@ struct subcommand showoptions[] = {
"Show the monitors that are configured",
"Show the monitors that are configured",
{0, 0, 0} },
{ "persistent", 1, dprintPersistentDCBs,
"Show persistent pool for a named server, e.g. show persistent dbnode1",
"Show persistent pool for a server, e.g. show persistent 0x485390. The address may also be replaced with the server name from the configuration file",
{ARG_TYPE_SERVER, 0, 0} },
{ "server", 1, dprintServer,
"Show details for a named server, e.g. show server dbnode1",
"Show details for a server, e.g. show server 0x485390. The address may also be repalced with the server name from the configuration file",

View File

@ -67,6 +67,7 @@
* 06/03/2014 Massimiliano Pinto Server connection counter is now updated in closeSession
* 24/06/2014 Massimiliano Pinto New rules for selecting the Master server
* 27/06/2014 Mark Riddoch Addition of server weighting
* 11/06/2015 Martin Brampton Remove decrement n_current (moved to dcb.c)
*
* @endverbatim
*/
@ -657,7 +658,6 @@ DCB* backend_dcb;
if (rses_begin_locked_router_action(router_cli_ses))
{
/* decrease server current connection counter */
atomic_add(&router_cli_ses->backend->server->stats.n_current, -1);
backend_dcb = router_cli_ses->backend_dcb;
router_cli_ses->backend_dcb = NULL;
@ -827,12 +827,7 @@ clientReply(
GWBUF *queue,
DCB *backend_dcb)
{
DCB *client ;
client = backend_dcb->session->client;
ss_dassert(client != NULL);
ss_dassert(backend_dcb->session->client != NULL);
SESSION_ROUTE_REPLY(backend_dcb->session, queue);
}

View File

@ -1022,7 +1022,6 @@ static void closeSession(
*/
dcb_close(dcb);
/** decrease server current connection counters */
atomic_add(&bref->bref_backend->backend_server->stats.n_current, -1);
atomic_add(&bref->bref_backend->backend_conn_count, -1);
}
}
@ -1637,7 +1636,8 @@ static skygw_query_type_t is_read_tmp_table(
bool target_tmp_table = false;
int tsize = 0, klen = 0,i;
char** tbl = NULL;
char *hkey,*dbname;
char *dbname;
char hkey[MYSQL_DATABASE_MAXLEN+MYSQL_TABLE_MAXLEN+2];
MYSQL_session* data;
DCB* master_dcb = NULL;
@ -1665,12 +1665,7 @@ static skygw_query_type_t is_read_tmp_table(
/** Query targets at least one table */
for(i = 0; i<tsize && !target_tmp_table && tbl[i]; i++)
{
klen = strlen(dbname) + strlen(tbl[i]) + 2;
hkey = calloc(klen,sizeof(char));
strcpy(hkey,dbname);
strcat(hkey,".");
strcat(hkey,tbl[i]);
sprintf(hkey,"%s.%s",dbname,tbl[i]);
if (rses_prop_tmp &&
rses_prop_tmp->rses_prop_data.temp_tables)
{
@ -1685,8 +1680,6 @@ static skygw_query_type_t is_read_tmp_table(
"Query targets a temporary table: %s",hkey)));
}
}
free(hkey);
}
}
@ -2018,8 +2011,9 @@ static bool route_single_stmt(
GWBUF* querybuf)
{
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
mysql_server_cmd_t packet_type;
mysql_server_cmd_t packet_type = MYSQL_COM_UNDEFINED;
uint8_t* packet;
size_t packet_len;
int ret = 0;
DCB* master_dcb = NULL;
DCB* target_dcb = NULL;
@ -2027,11 +2021,8 @@ static bool route_single_stmt(
bool succp = false;
int rlag_max = MAX_RLAG_UNDEFINED;
backend_type_t btype; /*< target backend type */
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
packet = GWBUF_DATA(querybuf);
packet_type = packet[4];
/**
* Read stored master DCB pointer. If master is not set, routing must
@ -2059,7 +2050,19 @@ static bool route_single_stmt(
{
querybuf = gwbuf_make_contiguous(querybuf);
}
packet = GWBUF_DATA(querybuf);
packet_len = gw_mysql_get_byte3(packet);
if(packet_len == 0)
{
route_target = TARGET_MASTER;
packet_type = MYSQL_COM_UNDEFINED;
}
else
{
packet_type = packet[4];
switch(packet_type) {
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
@ -2274,7 +2277,7 @@ static bool route_single_stmt(
}
goto retblock;
}
}
/** Lock router session */
if (!rses_begin_locked_router_action(rses))
{
@ -2494,8 +2497,8 @@ static bool route_single_stmt(
rses_end_locked_router_action(rses);
goto retblock;
}
if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1)
GWBUF* wbuf = gwbuf_clone(querybuf);
if ((ret = target_dcb->func.write(target_dcb, wbuf)) == 1)
{
backend_ref_t* bref;
@ -2509,7 +2512,8 @@ static bool route_single_stmt(
}
else
{
LOGIF(LE, (skygw_log_write_flush(
gwbuf_free(wbuf);
LOGIF((LE|LT), (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing query failed.")));
succp = false;
@ -2840,8 +2844,8 @@ static void clientReply (
/** There is one pending session command to be executed. */
if (sescmd_cursor_is_active(scur))
{
bool succp;
bool succp;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Backend %s:%d processed reply and starts to execute "
@ -2850,8 +2854,15 @@ static void clientReply (
bref->bref_backend->backend_server->port)));
succp = execute_sescmd_in_backend(bref);
ss_dassert(succp);
ss_dassert(succp);
if(!succp)
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Backend %s:%d failed to execute session command.",
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port)));
}
}
else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */
{
@ -2945,7 +2956,7 @@ static void bref_clear_state(
{
if(bref == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to bref_clear_state. (%s:%d)",__FILE__,__LINE__);
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
return;
}
if (state != BREF_WAITING_RESULT)
@ -2969,6 +2980,13 @@ static void bref_clear_state(
prev2 = atomic_add(
&bref->bref_backend->backend_server->stats.n_current_ops, -1);
ss_dassert(prev2 > 0);
if(prev2 <= 0)
{
skygw_log_write(LE,"[%s] Error: negative current operation count in backend %s:%u",
__FUNCTION__,
&bref->bref_backend->backend_server->name,
&bref->bref_backend->backend_server->port);
}
}
}
}
@ -2979,7 +2997,7 @@ static void bref_set_state(
{
if(bref == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to bref_set_state. (%s:%d)",__FILE__,__LINE__);
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
return;
}
if (state != BREF_WAITING_RESULT)
@ -2994,11 +3012,24 @@ static void bref_set_state(
/** Increase waiter count */
prev1 = atomic_add(&bref->bref_num_result_wait, 1);
ss_dassert(prev1 >= 0);
if(prev1 < 0)
{
skygw_log_write(LE,"[%s] Error: negative number of connections waiting for results in backend %s:%u",
__FUNCTION__,
&bref->bref_backend->backend_server->name,
&bref->bref_backend->backend_server->port);
}
/** Increase global operation count */
prev2 = atomic_add(
&bref->bref_backend->backend_server->stats.n_current_ops, 1);
ss_dassert(prev2 >= 0);
ss_dassert(prev2 >= 0);
if(prev2 < 0)
{
skygw_log_write(LE,"[%s] Error: negative current operation count in backend %s:%u",
__FUNCTION__,
&bref->bref_backend->backend_server->name,
&bref->bref_backend->backend_server->port);
}
}
}
@ -3554,7 +3585,6 @@ static rses_property_t* rses_property_init(
prop->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY;
#endif
return_prop:
CHK_RSES_PROP(prop);
return prop;
}
@ -3567,7 +3597,7 @@ static void rses_property_done(
{
if(prop == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to rses_property_done. (%s:%d)",__FILE__,__LINE__);
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
return;
}
CHK_RSES_PROP(prop);
@ -3652,7 +3682,7 @@ static mysql_sescmd_t* rses_property_get_sescmd(
if(prop == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to rses_property_get_sescmd. (%s:%d)",__FILE__,__LINE__);
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
return NULL;
}
@ -3702,7 +3732,7 @@ static void mysql_sescmd_done(
{
if(sescmd == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to mysql_sescmd_done. (%s:%d)",__FILE__,__LINE__);
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
return;
}
CHK_RSES_PROP(sescmd->my_sescmd_prop);
@ -3800,7 +3830,7 @@ static GWBUF* sescmd_cursor_process_replies(
/** Mark the rest session commands as replied */
scmd->my_sescmd_is_replied = true;
scmd->reply_cmd = *((unsigned char*)replybuf->start + 4);
skygw_log_write(LOGFILE_DEBUG,"Master '%s' responded to a session command.",
skygw_log_write(LT,"Master '%s' responded to a session command.",
bref->bref_backend->backend_server->unique_name);
int i;
@ -3820,6 +3850,11 @@ static GWBUF* sescmd_cursor_process_replies(
if(ses->rses_backend_ref[i].bref_dcb)
dcb_close(ses->rses_backend_ref[i].bref_dcb);
*reconnect = true;
skygw_log_write(LT,"Disabling slave %s:%d, result differs from master's result. Master: %d Slave: %d",
ses->rses_backend_ref[i].bref_backend->backend_server->name,
ses->rses_backend_ref[i].bref_backend->backend_server->port,
bref->reply_cmd,
ses->rses_backend_ref[i].reply_cmd);
}
}
}
@ -3827,11 +3862,17 @@ static GWBUF* sescmd_cursor_process_replies(
}
else
{
skygw_log_write(LOGFILE_DEBUG,"Slave '%s' responded faster to a session command.",
bref->bref_backend->backend_server->unique_name);
skygw_log_write(LT,"Slave '%s' responded before master to a session command. Result: %d",
bref->bref_backend->backend_server->unique_name,
(int)bref->reply_cmd);
if(bref->reply_cmd == 0xff)
{
SERVER* serv = bref->bref_backend->backend_server;
skygw_log_write(LE,"Error: Slave '%s' (%s:%u) failed to execute session command.",
serv->unique_name,serv->name,serv->port);
}
if(replybuf)
while((replybuf = gwbuf_consume(replybuf,gwbuf_length(replybuf))));
return NULL;
}
@ -3880,7 +3921,7 @@ static bool sescmd_cursor_is_active(
if(sescmd_cursor == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to sescmd_cursor_is_active. (%s:%d)",__FILE__,__LINE__);
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
return false;
}
ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock));
@ -3910,7 +3951,7 @@ static GWBUF* sescmd_cursor_clone_querybuf(
GWBUF* buf;
if(scur == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to sescmd_cursor_clone_querybuf. (%s:%d)",__FILE__,__LINE__);
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
return NULL;
}
ss_dassert(scur->scmd_cur_cmd != NULL);
@ -3928,7 +3969,7 @@ static bool sescmd_cursor_history_empty(
if(scur == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to sescmd_cursor_history_empty. (%s:%d)",__FILE__,__LINE__);
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
return true;
}
CHK_SESCMD_CUR(scur);
@ -3952,7 +3993,7 @@ static void sescmd_cursor_reset(
ROUTER_CLIENT_SES* rses;
if(scur == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to sescmd_cursor_reset. (%s:%d)",__FILE__,__LINE__);
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
return;
}
CHK_SESCMD_CUR(scur);
@ -3973,7 +4014,7 @@ static bool execute_sescmd_history(
sescmd_cursor_t* scur;
if(bref == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to execute_sescmd_history. (%s:%d)",__FILE__,__LINE__);
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
return false;
}
CHK_BACKEND_REF(bref);
@ -4014,7 +4055,7 @@ static bool execute_sescmd_in_backend(
GWBUF* buf;
if(backend_ref == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to execute_sescmd_in_backend. (%s:%d)",__FILE__,__LINE__);
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
return false;
}
if (BREF_IS_CLOSED(backend_ref))
@ -4123,7 +4164,7 @@ static bool sescmd_cursor_next(
if(scur == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to sescmd_cursor_next. (%s:%d)",__FILE__,__LINE__);
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
return false;
}
@ -5165,10 +5206,8 @@ static int router_handle_state_switch(
srv->name,
srv->port,
STRSRVSTATUS(srv))));
ses = dcb->session;
CHK_SESSION(ses);
rses = (ROUTER_CLIENT_SES *)dcb->session->router_session;
CHK_CLIENT_RSES(rses);
CHK_SESSION(((SESSION*)dcb->session));
CHK_CLIENT_RSES(((ROUTER_CLIENT_SES *)dcb->session->router_session));
switch (reason) {
case DCB_REASON_NOT_RESPONDING:

View File

@ -1179,7 +1179,6 @@ static void closeSession(
*/
dcb_close(dcb);
/** decrease server current connection counters */
atomic_add(&bref->bref_backend->backend_server->stats.n_current, -1);
atomic_add(&bref->bref_backend->backend_conn_count, -1);
}
}
@ -2924,11 +2923,16 @@ int bref_cmp_current_load(
return ((1000 * s1->stats.n_current_ops) - b1->weight)
- ((1000 * s2->stats.n_current_ops) - b2->weight);
}
static void bref_clear_state(
backend_ref_t* bref,
bref_state_t state)
{
if(bref == NULL)
{
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
return;
}
if (state != BREF_WAITING_RESULT)
{
bref->bref_state &= ~state;
@ -2937,10 +2941,10 @@ static void bref_clear_state(
{
int prev1;
int prev2;
/** Decrease waiter count */
prev1 = atomic_add(&bref->bref_num_result_wait, -1);
if (prev1 <= 0) {
atomic_add(&bref->bref_num_result_wait, 1);
}
@ -2950,14 +2954,26 @@ static void bref_clear_state(
prev2 = atomic_add(
&bref->bref_backend->backend_server->stats.n_current_ops, -1);
ss_dassert(prev2 > 0);
}
if(prev2 <= 0)
{
skygw_log_write(LE,"[%s] Error: negative current operation count in backend %s:%u",
__FUNCTION__,
&bref->bref_backend->backend_server->name,
&bref->bref_backend->backend_server->port);
}
}
}
}
static void bref_set_state(
static void bref_set_state(
backend_ref_t* bref,
bref_state_t state)
{
if(bref == NULL)
{
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
return;
}
if (state != BREF_WAITING_RESULT)
{
bref->bref_state |= state;
@ -2966,15 +2982,28 @@ static void bref_set_state(
{
int prev1;
int prev2;
/** Increase waiter count */
prev1 = atomic_add(&bref->bref_num_result_wait, 1);
ss_dassert(prev1 >= 0);
if(prev1 < 0)
{
skygw_log_write(LE,"[%s] Error: negative number of connections waiting for results in backend %s:%u",
__FUNCTION__,
&bref->bref_backend->backend_server->name,
&bref->bref_backend->backend_server->port);
}
/** Increase global operation count */
prev2 = atomic_add(
&bref->bref_backend->backend_server->stats.n_current_ops, 1);
ss_dassert(prev2 >= 0);
ss_dassert(prev2 >= 0);
if(prev2 < 0)
{
skygw_log_write(LE,"[%s] Error: negative current operation count in backend %s:%u",
__FUNCTION__,
&bref->bref_backend->backend_server->name,
&bref->bref_backend->backend_server->port);
}
}
}

View File

@ -319,7 +319,7 @@ parse_mapping_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf)
if(PTR_IS_RESULTSET(((unsigned char*)buf->start)) &&
modutil_count_signal_packets(buf,0,0,&more) == 2)
{
ptr = (char*)buf->start;
ptr = (unsigned char*)buf->start;
if(ptr[5] != 1)
{