Merge remote-tracking branch 'origin/develop' into MXS-122

Conflicts:
	server/core/dcb.c
This commit is contained in:
counterpoint
2015-07-08 14:54:22 +01:00
45 changed files with 1189 additions and 774 deletions

View File

@ -226,7 +226,7 @@ static rses_property_t* mysql_sescmd_get_property(
static rses_property_t* rses_property_init(
rses_property_type_t prop_type);
static void rses_property_add(
static int rses_property_add(
ROUTER_CLIENT_SES* rses,
rses_property_t* prop);
@ -287,7 +287,7 @@ static sescmd_cursor_t* backend_ref_get_sescmd_cursor (backend_ref_t* bref);
static int router_handle_state_switch(DCB* dcb, DCB_REASON reason, void* data);
static bool handle_error_new_connection(
ROUTER_INSTANCE* inst,
ROUTER_CLIENT_SES* rses,
ROUTER_CLIENT_SES** rses,
DCB* backend_dcb,
GWBUF* errmsg);
static void handle_error_reply_client(
@ -1243,7 +1243,8 @@ static bool get_dcb(
SERVER_IS_SLAVE(b->backend_server) &&
(max_rlag == MAX_RLAG_UNDEFINED ||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
b->backend_server->rlag <= max_rlag)))
b->backend_server->rlag <= max_rlag)) &&
!rses->rses_config.master_reads)
{
/** found slave */
candidate_bref = &backend_ref[i];
@ -2941,6 +2942,11 @@ static void bref_clear_state(
backend_ref_t* bref,
bref_state_t state)
{
if(bref == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to bref_clear_state. (%s:%d)",__FILE__,__LINE__);
return;
}
if (state != BREF_WAITING_RESULT)
{
bref->bref_state &= ~state;
@ -2970,6 +2976,11 @@ static void bref_set_state(
backend_ref_t* bref,
bref_state_t state)
{
if(bref == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to bref_set_state. (%s:%d)",__FILE__,__LINE__);
return;
}
if (state != BREF_WAITING_RESULT)
{
bref->bref_state |= state;
@ -3533,7 +3544,8 @@ static rses_property_t* rses_property_init(
prop = (rses_property_t*)calloc(1, sizeof(rses_property_t));
if (prop == NULL)
{
goto return_prop;
skygw_log_write(LE,"Error: Malloc returned NULL. (%s:%d)",__FILE__,__LINE__);
return NULL;
}
prop->rses_prop_type = prop_type;
#if defined(SS_DEBUG)
@ -3541,7 +3553,6 @@ static rses_property_t* rses_property_init(
prop->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY;
#endif
return_prop:
CHK_RSES_PROP(prop);
return prop;
}
@ -3552,6 +3563,11 @@ return_prop:
static void rses_property_done(
rses_property_t* prop)
{
if(prop == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to rses_property_done. (%s:%d)",__FILE__,__LINE__);
return;
}
CHK_RSES_PROP(prop);
switch (prop->rses_prop_type) {
@ -3585,10 +3601,20 @@ static void rses_property_done(
*
* Router client session must be locked.
*/
static void rses_property_add(
static int rses_property_add(
ROUTER_CLIENT_SES* rses,
rses_property_t* prop)
{
if(rses == NULL)
{
skygw_log_write(LE,"Error: Router client session is NULL. (%s:%d)",__FILE__,__LINE__);
return -1;
}
if(prop == NULL)
{
skygw_log_write(LE,"Error: Router client session property is NULL. (%s:%d)",__FILE__,__LINE__);
return -1;
}
rses_property_t* p;
CHK_CLIENT_RSES(rses);
@ -3610,6 +3636,7 @@ static void rses_property_add(
}
p->rses_prop_next = prop;
}
return 0;
}
/**
@ -3620,7 +3647,13 @@ static mysql_sescmd_t* rses_property_get_sescmd(
rses_property_t* prop)
{
mysql_sescmd_t* sescmd;
if(prop == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to rses_property_get_sescmd. (%s:%d)",__FILE__,__LINE__);
return NULL;
}
CHK_RSES_PROP(prop);
ss_dassert(prop->rses_prop_rsession == NULL ||
SPINLOCK_IS_LOCKED(&prop->rses_prop_rsession->rses_lock));
@ -3633,22 +3666,6 @@ static mysql_sescmd_t* rses_property_get_sescmd(
}
return sescmd;
}
/**
static void rses_begin_locked_property_action(
rses_property_t* prop)
{
CHK_RSES_PROP(prop);
spinlock_acquire(&prop->rses_prop_lock);
}
static void rses_end_locked_property_action(
rses_property_t* prop)
{
CHK_RSES_PROP(prop);
spinlock_release(&prop->rses_prop_lock);
}
*/
/**
* Create session command property.
@ -3681,6 +3698,11 @@ static mysql_sescmd_t* mysql_sescmd_init (
static void mysql_sescmd_done(
mysql_sescmd_t* sescmd)
{
if(sescmd == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to mysql_sescmd_done. (%s:%d)",__FILE__,__LINE__);
return;
}
CHK_RSES_PROP(sescmd->my_sescmd_prop);
gwbuf_free(sescmd->my_sescmd_buf);
memset(sescmd, 0, sizeof(mysql_sescmd_t));
@ -3763,7 +3785,7 @@ static GWBUF* sescmd_cursor_process_replies(
dcb_close(bref->bref_dcb);
*reconnect = true;
if(replybuf)
gwbuf_consume(replybuf,gwbuf_length(replybuf));
while((replybuf = gwbuf_consume(replybuf,gwbuf_length(replybuf))));
}
}
/** This is a response from the master and it is the "right" one.
@ -3806,7 +3828,7 @@ static GWBUF* sescmd_cursor_process_replies(
skygw_log_write(LOGFILE_DEBUG,"Slave '%s' responded faster to a session command.",
bref->bref_backend->backend_server->unique_name);
if(replybuf)
gwbuf_free(replybuf);
while((replybuf = gwbuf_consume(replybuf,gwbuf_length(replybuf))));
return NULL;
}
@ -3853,6 +3875,12 @@ static bool sescmd_cursor_is_active(
sescmd_cursor_t* sescmd_cursor)
{
bool succp;
if(sescmd_cursor == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to sescmd_cursor_is_active. (%s:%d)",__FILE__,__LINE__);
return false;
}
ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock));
succp = sescmd_cursor->scmd_cur_active;
@ -3878,6 +3906,11 @@ static GWBUF* sescmd_cursor_clone_querybuf(
sescmd_cursor_t* scur)
{
GWBUF* buf;
if(scur == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to sescmd_cursor_clone_querybuf. (%s:%d)",__FILE__,__LINE__);
return NULL;
}
ss_dassert(scur->scmd_cur_cmd != NULL);
buf = gwbuf_clone(scur->scmd_cur_cmd->my_sescmd_buf);
@ -3890,7 +3923,12 @@ static bool sescmd_cursor_history_empty(
sescmd_cursor_t* scur)
{
bool succp;
if(scur == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to sescmd_cursor_history_empty. (%s:%d)",__FILE__,__LINE__);
return true;
}
CHK_SESCMD_CUR(scur);
if (scur->scmd_cur_rses->rses_properties[RSES_PROP_TYPE_SESCMD] == NULL)
@ -3910,6 +3948,11 @@ static void sescmd_cursor_reset(
sescmd_cursor_t* scur)
{
ROUTER_CLIENT_SES* rses;
if(scur == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to sescmd_cursor_reset. (%s:%d)",__FILE__,__LINE__);
return;
}
CHK_SESCMD_CUR(scur);
CHK_CLIENT_RSES(scur->scmd_cur_rses);
rses = scur->scmd_cur_rses;
@ -3926,6 +3969,11 @@ static bool execute_sescmd_history(
{
bool succp;
sescmd_cursor_t* scur;
if(bref == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to execute_sescmd_history. (%s:%d)",__FILE__,__LINE__);
return false;
}
CHK_BACKEND_REF(bref);
scur = &bref->bref_sescmd_cur;
@ -3961,7 +4009,12 @@ static bool execute_sescmd_in_backend(
bool succp;
int rc = 0;
sescmd_cursor_t* scur;
GWBUF* buf;
if(backend_ref == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to execute_sescmd_in_backend. (%s:%d)",__FILE__,__LINE__);
return false;
}
if (BREF_IS_CLOSED(backend_ref))
{
succp = false;
@ -3993,27 +4046,9 @@ static bool execute_sescmd_in_backend(
/** Cursor is left active when function returns. */
sescmd_cursor_set_active(scur, true);
}
#if defined(SS_DEBUG)
LOGIF(LT, tracelog_routed_query(scur->scmd_cur_rses,
"execute_sescmd_in_backend",
backend_ref,
sescmd_cursor_clone_querybuf(scur)));
{
GWBUF* tmpbuf = sescmd_cursor_clone_querybuf(scur);
uint8_t* ptr = GWBUF_DATA(tmpbuf);
unsigned char cmd = MYSQL_GET_COMMAND(ptr);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [execute_sescmd_in_backend] Just before write, fd "
"%d : cmd %s.",
pthread_self(),
dcb->fd,
STRPACKETTYPE(cmd))));
gwbuf_free(tmpbuf);
}
#endif /*< SS_DEBUG */
buf = sescmd_cursor_clone_querybuf(scur);
switch (scur->scmd_cur_cmd->my_sescmd_packet_type) {
case MYSQL_COM_CHANGE_USER:
/** This makes it possible to handle replies correctly */
@ -4022,7 +4057,7 @@ static bool execute_sescmd_in_backend(
dcb,
NULL,
dcb->session,
sescmd_cursor_clone_querybuf(scur));
buf);
break;
case MYSQL_COM_INIT_DB:
@ -4048,10 +4083,11 @@ static bool execute_sescmd_in_backend(
* Mark session command buffer, it triggers writing
* MySQL command to protocol
*/
gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD);
rc = dcb->func.write(
dcb,
sescmd_cursor_clone_querybuf(scur));
buf);
break;
}
@ -4061,6 +4097,7 @@ static bool execute_sescmd_in_backend(
}
else
{
while((buf = GWBUF_CONSUME_ALL(buf)) != NULL);
succp = false;
}
return_succp:
@ -4082,6 +4119,12 @@ static bool sescmd_cursor_next(
rses_property_t* prop_curr;
rses_property_t* prop_next;
if(scur == NULL)
{
skygw_log_write(LE,"Error: NULL parameter passed to sescmd_cursor_next. (%s:%d)",__FILE__,__LINE__);
return false;
}
ss_dassert(scur != NULL);
ss_dassert(*(scur->scmd_cur_ptr_property) != NULL);
ss_dassert(SPINLOCK_IS_LOCKED(
@ -4408,11 +4451,21 @@ static bool route_session_write(
* prevent it from being released before properties
* are cleaned up as a part of router sessionclean-up.
*/
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
if((prop = rses_property_init(RSES_PROP_TYPE_SESCMD)) == NULL)
{
skygw_log_write(LE,"Error: Router session property initialization failed");
rses_end_locked_router_action(router_cli_ses);
return false;
}
mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses);
/** Add sescmd property to router client session */
rses_property_add(router_cli_ses, prop);
if(rses_property_add(router_cli_ses, prop) != 0)
{
skygw_log_write(LE,"Error: Session property addition failed.");
rses_end_locked_router_action(router_cli_ses);
return false;
}
for (i=0; i<router_cli_ses->rses_nbackends; i++)
{
@ -4535,7 +4588,10 @@ static void rwsplit_process_router_options(
int i;
char* value;
select_criteria_t c;
if(options == NULL)
return;
for (i = 0; options[i]; i++)
{
if ((value = strchr(options[i], '=')) == NULL)
@ -4588,6 +4644,10 @@ static void rwsplit_process_router_options(
{
router->rwsplit_config.disable_slave_recovery = config_truth_value(value);
}
else if(strcmp(options[i],"master_accept_reads") == 0)
{
router->rwsplit_config.master_reads = config_truth_value(value);
}
}
} /*< for */
}
@ -4619,7 +4679,7 @@ static void handleError (
SESSION* session;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session;
CHK_DCB(backend_dcb);
/** Reset error handle flag from a given DCB */
@ -4686,14 +4746,15 @@ static void handleError (
{
/**
* This is called in hope of getting replacement for
* failed slave(s).
* failed slave(s). This call may free rses.
*/
*succp = handle_error_new_connection(inst,
rses,
&rses,
backend_dcb,
errmsgbuf);
}
rses_end_locked_router_action(rses);
/* Free the lock if rses still exists */
if (rses) rses_end_locked_router_action(rses);
break;
}
@ -4762,10 +4823,11 @@ static void handle_error_reply_client(
*/
static bool handle_error_new_connection(
ROUTER_INSTANCE* inst,
ROUTER_CLIENT_SES* rses,
ROUTER_CLIENT_SES** rses,
DCB* backend_dcb,
GWBUF* errmsg)
{
ROUTER_CLIENT_SES* myrses;
SESSION* ses;
int router_nservers;
int max_nslaves;
@ -4773,7 +4835,8 @@ static bool handle_error_new_connection(
backend_ref_t* bref;
bool succp;
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
myrses = *rses;
ss_dassert(SPINLOCK_IS_LOCKED(&myrses->rses_lock));
ses = backend_dcb->session;
CHK_SESSION(ses);
@ -4781,7 +4844,7 @@ static bool handle_error_new_connection(
/**
* If bref == NULL it has been replaced already with another one.
*/
if ((bref = get_bref_from_dcb(rses, backend_dcb)) == NULL)
if ((bref = get_bref_from_dcb(myrses, backend_dcb)) == NULL)
{
succp = true;
goto return_succp;
@ -4824,25 +4887,25 @@ static bool handle_error_new_connection(
(void *)bref);
router_nservers = router_get_servercount(inst);
max_nslaves = rses_get_max_slavecount(rses, router_nservers);
max_slave_rlag = rses_get_max_replication_lag(rses);
max_nslaves = rses_get_max_slavecount(myrses, router_nservers);
max_slave_rlag = rses_get_max_replication_lag(myrses);
/**
* Try to get replacement slave or at least the minimum
* number of slave connections for router session.
*/
if(inst->rwsplit_config.disable_slave_recovery)
{
succp = have_enough_servers(&rses,1,router_nservers,inst) ? true : false;
succp = have_enough_servers(&myrses,1,router_nservers,inst) ? true : false;
}
else
{
succp = select_connect_backend_servers(
&rses->rses_master_ref,
rses->rses_backend_ref,
&myrses->rses_master_ref,
myrses->rses_backend_ref,
router_nservers,
max_nslaves,
max_slave_rlag,
rses->rses_config.rw_slave_select_criteria,
myrses->rses_config.rw_slave_select_criteria,
ses,
inst);
}
@ -5079,10 +5142,9 @@ static int router_handle_state_switch(
{
backend_ref_t* bref;
int rc = 1;
ROUTER_CLIENT_SES* rses;
SESSION* ses;
SERVER* srv;
ROUTER_CLIENT_SES* rses;
SESSION* ses;
CHK_DCB(dcb);
bref = (backend_ref_t *)data;
CHK_BACKEND_REF(bref);
@ -5103,8 +5165,7 @@ static int router_handle_state_switch(
STRSRVSTATUS(srv))));
ses = dcb->session;
CHK_SESSION(ses);
rses = (ROUTER_CLIENT_SES *)dcb->session->router_session;
rses = (ROUTER_CLIENT_SES *)dcb->session->router_session;
CHK_CLIENT_RSES(rses);
switch (reason) {