poll.c:dcb_close Don't call poll_remove_dcb anymore if DCB has already been removed from poll set.
mysql_backend.c, mysql_client.c free error message GWBUF after calling handleError readconnroute.c:handleError send error message to client before returning. readwritesplit.c:handleError don't free error message buffer anymore since the caller of handleError frees it.
This commit is contained in:
@ -1140,8 +1140,28 @@ dcb_close(DCB *dcb)
|
|||||||
/*<
|
/*<
|
||||||
* Stop dcb's listening and modify state accordingly.
|
* Stop dcb's listening and modify state accordingly.
|
||||||
*/
|
*/
|
||||||
rc = poll_remove_dcb(dcb);
|
if (dcb->state == DCB_STATE_POLLING)
|
||||||
|
{
|
||||||
|
rc = poll_remove_dcb(dcb);
|
||||||
|
|
||||||
|
if (rc == 0) {
|
||||||
|
LOGIF(LD, (skygw_log_write(
|
||||||
|
LOGFILE_DEBUG,
|
||||||
|
"%lu [dcb_close] Removed dcb %p in state %s from "
|
||||||
|
"poll set.",
|
||||||
|
pthread_self(),
|
||||||
|
dcb,
|
||||||
|
STRDCBSTATE(dcb->state))));
|
||||||
|
} else {
|
||||||
|
LOGIF(LE, (skygw_log_write(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"%lu [dcb_close] Error : Removing dcb %p in state %s from "
|
||||||
|
"poll set failed.",
|
||||||
|
pthread_self(),
|
||||||
|
dcb,
|
||||||
|
STRDCBSTATE(dcb->state))));
|
||||||
|
}
|
||||||
|
}
|
||||||
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
|
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
|
||||||
dcb->state == DCB_STATE_ZOMBIE);
|
dcb->state == DCB_STATE_ZOMBIE);
|
||||||
/**
|
/**
|
||||||
@ -1153,23 +1173,6 @@ dcb_close(DCB *dcb)
|
|||||||
}
|
}
|
||||||
dcb_call_callback(dcb, DCB_REASON_CLOSE);
|
dcb_call_callback(dcb, DCB_REASON_CLOSE);
|
||||||
|
|
||||||
if (rc == 0) {
|
|
||||||
LOGIF(LD, (skygw_log_write(
|
|
||||||
LOGFILE_DEBUG,
|
|
||||||
"%lu [dcb_close] Removed dcb %p in state %s from "
|
|
||||||
"poll set.",
|
|
||||||
pthread_self(),
|
|
||||||
dcb,
|
|
||||||
STRDCBSTATE(dcb->state))));
|
|
||||||
} else {
|
|
||||||
LOGIF(LE, (skygw_log_write(
|
|
||||||
LOGFILE_ERROR,
|
|
||||||
"%lu [dcb_close] Error : Removing dcb %p in state %s from "
|
|
||||||
"poll set failed.",
|
|
||||||
pthread_self(),
|
|
||||||
dcb,
|
|
||||||
STRDCBSTATE(dcb->state))));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dcb->state == DCB_STATE_NOPOLLING)
|
if (dcb->state == DCB_STATE_NOPOLLING)
|
||||||
{
|
{
|
||||||
|
@ -377,7 +377,7 @@ static int gw_read_backend_event(DCB *dcb) {
|
|||||||
dcb,
|
dcb,
|
||||||
ERRACT_REPLY_CLIENT,
|
ERRACT_REPLY_CLIENT,
|
||||||
&succp);
|
&succp);
|
||||||
|
gwbuf_free(errbuf);
|
||||||
ss_dassert(!succp);
|
ss_dassert(!succp);
|
||||||
LOGIF(LD, (skygw_log_write(
|
LOGIF(LD, (skygw_log_write(
|
||||||
LOGFILE_DEBUG,
|
LOGFILE_DEBUG,
|
||||||
@ -459,6 +459,7 @@ static int gw_read_backend_event(DCB *dcb) {
|
|||||||
dcb,
|
dcb,
|
||||||
ERRACT_NEW_CONNECTION,
|
ERRACT_NEW_CONNECTION,
|
||||||
&succp);
|
&succp);
|
||||||
|
gwbuf_free(errbuf);
|
||||||
|
|
||||||
if (!succp)
|
if (!succp)
|
||||||
{
|
{
|
||||||
@ -848,6 +849,7 @@ static int gw_error_backend_event(DCB *dcb)
|
|||||||
dcb,
|
dcb,
|
||||||
ERRACT_NEW_CONNECTION,
|
ERRACT_NEW_CONNECTION,
|
||||||
&succp);
|
&succp);
|
||||||
|
gwbuf_free(errbuf);
|
||||||
|
|
||||||
/** There are not required backends available, close session. */
|
/** There are not required backends available, close session. */
|
||||||
if (!succp) {
|
if (!succp) {
|
||||||
@ -1031,7 +1033,8 @@ gw_backend_hangup(DCB *dcb)
|
|||||||
ERRACT_NEW_CONNECTION,
|
ERRACT_NEW_CONNECTION,
|
||||||
&succp);
|
&succp);
|
||||||
|
|
||||||
/** There are not required backends available, close session. */
|
gwbuf_free(errbuf);
|
||||||
|
/** There are no required backends available, close session. */
|
||||||
if (!succp)
|
if (!succp)
|
||||||
{
|
{
|
||||||
#if defined(SS_DEBUG)
|
#if defined(SS_DEBUG)
|
||||||
@ -1039,7 +1042,6 @@ gw_backend_hangup(DCB *dcb)
|
|||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
"Backend hangup -> closing session.")));
|
"Backend hangup -> closing session.")));
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
spinlock_acquire(&session->ses_lock);
|
spinlock_acquire(&session->ses_lock);
|
||||||
session->state = SESSION_STATE_STOPPING;
|
session->state = SESSION_STATE_STOPPING;
|
||||||
spinlock_release(&session->ses_lock);
|
spinlock_release(&session->ses_lock);
|
||||||
@ -1176,6 +1178,7 @@ static int backend_write_delayqueue(DCB *dcb)
|
|||||||
dcb,
|
dcb,
|
||||||
ERRACT_NEW_CONNECTION,
|
ERRACT_NEW_CONNECTION,
|
||||||
&succp);
|
&succp);
|
||||||
|
gwbuf_free(errbuf);
|
||||||
|
|
||||||
if (!succp)
|
if (!succp)
|
||||||
{
|
{
|
||||||
|
@ -859,6 +859,7 @@ int gw_read_client_event(
|
|||||||
dcb,
|
dcb,
|
||||||
ERRACT_REPLY_CLIENT,
|
ERRACT_REPLY_CLIENT,
|
||||||
&succp);
|
&succp);
|
||||||
|
gwbuf_free(errbuf);
|
||||||
ss_dassert(!succp);
|
ss_dassert(!succp);
|
||||||
|
|
||||||
dcb_close(dcb);
|
dcb_close(dcb);
|
||||||
|
@ -701,7 +701,7 @@ routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
|
|||||||
goto return_rc;
|
goto return_rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
switch(mysql_command) {
|
switch(mysql_command) {
|
||||||
case MYSQL_COM_CHANGE_USER:
|
case MYSQL_COM_CHANGE_USER:
|
||||||
rc = backend_dcb->func.auth(
|
rc = backend_dcb->func.auth(
|
||||||
backend_dcb,
|
backend_dcb,
|
||||||
@ -815,21 +815,33 @@ clientReply(
|
|||||||
* @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION
|
* @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
static void
|
static void handleError(
|
||||||
handleError(
|
ROUTER *instance,
|
||||||
ROUTER *instance,
|
void *router_session,
|
||||||
void *router_session,
|
GWBUF *errbuf,
|
||||||
GWBUF *errbuf,
|
DCB *backend_dcb,
|
||||||
DCB *backend_dcb,
|
error_action_t action,
|
||||||
error_action_t action,
|
bool *succp)
|
||||||
bool *succp)
|
|
||||||
{
|
{
|
||||||
DCB *client = NULL;
|
DCB *client_dcb;
|
||||||
SESSION *session = backend_dcb->session;
|
SESSION *session = backend_dcb->session;
|
||||||
client = session->client;
|
session_state_t sesstate;
|
||||||
|
|
||||||
|
spinlock_acquire(&session->ses_lock);
|
||||||
|
sesstate = session->state;
|
||||||
|
client_dcb = session->client;
|
||||||
|
spinlock_release(&session->ses_lock);
|
||||||
|
ss_dassert(client_dcb != NULL);
|
||||||
|
|
||||||
|
if (sesstate == SESSION_STATE_ROUTER_READY)
|
||||||
|
{
|
||||||
|
CHK_DCB(client_dcb);
|
||||||
|
client_dcb->func.write(client_dcb, gwbuf_clone(errbuf));
|
||||||
|
}
|
||||||
|
|
||||||
/** false because connection is not available anymore */
|
/** false because connection is not available anymore */
|
||||||
*succp = false;
|
*succp = false;
|
||||||
ss_dassert(client != NULL);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** to be inline'd */
|
/** to be inline'd */
|
||||||
|
@ -4023,7 +4023,7 @@ static void rwsplit_process_router_options(
|
|||||||
*
|
*
|
||||||
* @param instance The router instance
|
* @param instance The router instance
|
||||||
* @param router_session The router session
|
* @param router_session The router session
|
||||||
* @param message The error message to reply
|
* @param errmsgbuf The error message to reply
|
||||||
* @param backend_dcb The backend DCB
|
* @param backend_dcb The backend DCB
|
||||||
* @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION
|
* @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION
|
||||||
* @param succp Result of action.
|
* @param succp Result of action.
|
||||||
@ -4088,114 +4088,104 @@ static void handleError (
|
|||||||
|
|
||||||
|
|
||||||
static void handle_error_reply_client(
|
static void handle_error_reply_client(
|
||||||
SESSION* ses,
|
SESSION* ses,
|
||||||
GWBUF* errmsg)
|
GWBUF* errmsg)
|
||||||
{
|
{
|
||||||
session_state_t sesstate;
|
session_state_t sesstate;
|
||||||
DCB* client_dcb;
|
DCB* client_dcb;
|
||||||
|
|
||||||
spinlock_acquire(&ses->ses_lock);
|
spinlock_acquire(&ses->ses_lock);
|
||||||
sesstate = ses->state;
|
sesstate = ses->state;
|
||||||
client_dcb = ses->client;
|
client_dcb = ses->client;
|
||||||
spinlock_release(&ses->ses_lock);
|
spinlock_release(&ses->ses_lock);
|
||||||
|
|
||||||
if (sesstate == SESSION_STATE_ROUTER_READY)
|
if (sesstate == SESSION_STATE_ROUTER_READY)
|
||||||
{
|
{
|
||||||
CHK_DCB(client_dcb);
|
CHK_DCB(client_dcb);
|
||||||
client_dcb->func.write(client_dcb, errmsg);
|
client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
while ((errmsg=gwbuf_consume(errmsg, GWBUF_LENGTH(errmsg))) != NULL)
|
|
||||||
;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This must be called with router lock
|
* This must be called with router lock
|
||||||
*/
|
*/
|
||||||
static bool handle_error_new_connection(
|
static bool handle_error_new_connection(
|
||||||
ROUTER_INSTANCE* inst,
|
ROUTER_INSTANCE* inst,
|
||||||
ROUTER_CLIENT_SES* rses,
|
ROUTER_CLIENT_SES* rses,
|
||||||
DCB* backend_dcb,
|
DCB* backend_dcb,
|
||||||
GWBUF* errmsg)
|
GWBUF* errmsg)
|
||||||
{
|
{
|
||||||
SESSION* ses;
|
SESSION* ses;
|
||||||
int router_nservers;
|
int router_nservers;
|
||||||
int max_nslaves;
|
int max_nslaves;
|
||||||
int max_slave_rlag;
|
int max_slave_rlag;
|
||||||
backend_ref_t* bref;
|
backend_ref_t* bref;
|
||||||
bool succp;
|
bool succp;
|
||||||
|
|
||||||
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
|
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
|
||||||
|
|
||||||
ses = backend_dcb->session;
|
ses = backend_dcb->session;
|
||||||
CHK_SESSION(ses);
|
CHK_SESSION(ses);
|
||||||
|
|
||||||
bref = get_bref_from_dcb(rses, backend_dcb);
|
bref = get_bref_from_dcb(rses, backend_dcb);
|
||||||
|
|
||||||
/** failed DCB has already been replaced */
|
/** failed DCB has already been replaced */
|
||||||
if (bref == NULL)
|
if (bref == NULL)
|
||||||
{
|
{
|
||||||
succp = true;
|
succp = true;
|
||||||
goto return_succp;
|
goto return_succp;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Error handler is already called for this DCB because
|
* Error handler is already called for this DCB because
|
||||||
* it's not polling anymore. It can be assumed that
|
* it's not polling anymore. It can be assumed that
|
||||||
* it succeed because rses isn't closed.
|
* it succeed because rses isn't closed.
|
||||||
*/
|
*/
|
||||||
if (backend_dcb->state != DCB_STATE_POLLING)
|
if (backend_dcb->state != DCB_STATE_POLLING)
|
||||||
{
|
{
|
||||||
succp = true;
|
succp = true;
|
||||||
goto return_succp;
|
goto return_succp;
|
||||||
}
|
}
|
||||||
|
|
||||||
CHK_BACKEND_REF(bref);
|
CHK_BACKEND_REF(bref);
|
||||||
|
|
||||||
if (BREF_IS_WAITING_RESULT(bref))
|
if (BREF_IS_WAITING_RESULT(bref))
|
||||||
{
|
{
|
||||||
DCB* client_dcb;
|
DCB* client_dcb;
|
||||||
client_dcb = ses->client;
|
client_dcb = ses->client;
|
||||||
client_dcb->func.write(client_dcb, errmsg);
|
client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
|
||||||
bref_clear_state(bref, BREF_WAITING_RESULT);
|
bref_clear_state(bref, BREF_WAITING_RESULT);
|
||||||
}
|
}
|
||||||
else
|
bref_clear_state(bref, BREF_IN_USE);
|
||||||
{
|
bref_set_state(bref, BREF_CLOSED);
|
||||||
while ((errmsg=gwbuf_consume(errmsg, GWBUF_LENGTH(errmsg))) != NULL)
|
/**
|
||||||
;
|
* Remove callback because this DCB won't be used
|
||||||
}
|
* unless it is reconnected later, and then the callback
|
||||||
bref_clear_state(bref, BREF_IN_USE);
|
* is set again.
|
||||||
bref_set_state(bref, BREF_CLOSED);
|
*/
|
||||||
/**
|
dcb_remove_callback(backend_dcb,
|
||||||
* Remove callback because this DCB won't be used
|
DCB_REASON_NOT_RESPONDING,
|
||||||
* unless it is reconnected later, and then the callback
|
&router_handle_state_switch,
|
||||||
* is set again.
|
(void *)bref);
|
||||||
*/
|
|
||||||
dcb_remove_callback(backend_dcb,
|
|
||||||
DCB_REASON_NOT_RESPONDING,
|
|
||||||
&router_handle_state_switch,
|
|
||||||
(void *)bref);
|
|
||||||
|
|
||||||
router_nservers = router_get_servercount(inst);
|
router_nservers = router_get_servercount(inst);
|
||||||
max_nslaves = rses_get_max_slavecount(rses, router_nservers);
|
max_nslaves = rses_get_max_slavecount(rses, router_nservers);
|
||||||
max_slave_rlag = rses_get_max_replication_lag(rses);
|
max_slave_rlag = rses_get_max_replication_lag(rses);
|
||||||
/**
|
/**
|
||||||
* Try to get replacement slave or at least the minimum
|
* Try to get replacement slave or at least the minimum
|
||||||
* number of slave connections for router session.
|
* number of slave connections for router session.
|
||||||
*/
|
*/
|
||||||
succp = select_connect_backend_servers(
|
succp = select_connect_backend_servers(
|
||||||
&rses->rses_master_ref,
|
&rses->rses_master_ref,
|
||||||
rses->rses_backend_ref,
|
rses->rses_backend_ref,
|
||||||
router_nservers,
|
router_nservers,
|
||||||
max_nslaves,
|
max_nslaves,
|
||||||
max_slave_rlag,
|
max_slave_rlag,
|
||||||
rses->rses_config.rw_slave_select_criteria,
|
rses->rses_config.rw_slave_select_criteria,
|
||||||
ses,
|
ses,
|
||||||
inst);
|
inst);
|
||||||
|
|
||||||
return_succp:
|
return_succp:
|
||||||
return succp;
|
return succp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user