Do error handling in one big locked action

This is not the optimal way to do error handling but it should solve all
problems that could rise from the multi-threaded model of MaxScale.

By taking a lock at the start of handleError, we'll be able to modify the
dcb error handling flag in a thread-safe manner. This should prevent
double error handling for all DCBs.
This commit is contained in:
Markus Makela
2016-11-28 13:13:17 +02:00
parent f8f400bdfd
commit 0bc68742a6

View File

@ -69,7 +69,15 @@ MODULE_INFO info =
* @endverbatim * @endverbatim
*/ */
#define RW_CHK_DCB(bref, dcb) do{if(dcb->state == DCB_STATE_DISCONNECTED){MXS_NOTICE("DCB was closed on line %d and another attempt to close it is made on line %d." , (bref) ? (bref)->closed_at : -1, __LINE__);}}while(false) #define RW_CHK_DCB(bref, dcb) \
do{ \
if(dcb->state == DCB_STATE_DISCONNECTED){ \
MXS_NOTICE("DCB was closed on line %d and another attempt to close it is made on line %d." , \
(bref) ? (bref)->closed_at : -1, __LINE__); \
} \
}while (false)
#define RW_CLOSE_BREF(b) do{ if (bref){ bref->closed_at = __LINE__; } } while (false)
static char *version_str = "V1.1.0"; static char *version_str = "V1.1.0";
@ -956,7 +964,7 @@ static void closeSession(ROUTER *instance, void *router_session)
*/ */
RW_CHK_DCB(bref, dcb); RW_CHK_DCB(bref, dcb);
dcb_close(dcb); dcb_close(dcb);
bref->closed_at = __LINE__; RW_CLOSE_BREF(bref);
/** decrease server current connection counters */ /** decrease server current connection counters */
atomic_add(&bref->bref_backend->backend_conn_count, -1); atomic_add(&bref->bref_backend->backend_conn_count, -1);
} }
@ -3001,7 +3009,7 @@ bool connect_server(backend_ref_t *bref, SESSION *session, bool execute_history)
bref->bref_backend->backend_server->port); bref->bref_backend->backend_server->port);
RW_CHK_DCB(bref, bref->bref_dcb); RW_CHK_DCB(bref, bref->bref_dcb);
dcb_close(bref->bref_dcb); dcb_close(bref->bref_dcb);
bref->closed_at = __LINE__; RW_CLOSE_BREF(bref);
bref->bref_dcb = NULL; bref->bref_dcb = NULL;
} }
} }
@ -3318,7 +3326,7 @@ static bool select_connect_backend_servers(backend_ref_t **p_master_ref,
atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1);
RW_CHK_DCB(&backend_ref[i], backend_ref[i].bref_dcb); RW_CHK_DCB(&backend_ref[i], backend_ref[i].bref_dcb);
dcb_close(backend_ref[i].bref_dcb); dcb_close(backend_ref[i].bref_dcb);
backend_ref[i].closed_at = __LINE__; RW_CLOSE_BREF(&backend_ref[i]);
} }
} }
} }
@ -3562,7 +3570,7 @@ static GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf,
{ {
RW_CHK_DCB(bref, bref->bref_dcb); RW_CHK_DCB(bref, bref->bref_dcb);
dcb_close(bref->bref_dcb); dcb_close(bref->bref_dcb);
bref->closed_at = __LINE__; RW_CLOSE_BREF(bref);
} }
*reconnect = true; *reconnect = true;
gwbuf_free(replybuf); gwbuf_free(replybuf);
@ -3604,7 +3612,7 @@ static GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf,
{ {
RW_CHK_DCB(&ses->rses_backend_ref[i], ses->rses_backend_ref[i].bref_dcb); RW_CHK_DCB(&ses->rses_backend_ref[i], ses->rses_backend_ref[i].bref_dcb);
dcb_close(ses->rses_backend_ref[i].bref_dcb); dcb_close(ses->rses_backend_ref[i].bref_dcb);
ses->rses_backend_ref[i].closed_at = __LINE__; RW_CLOSE_BREF(&ses->rses_backend_ref[i]);
} }
*reconnect = true; *reconnect = true;
MXS_WARNING("Disabling slave %s:%d, result differs from " MXS_WARNING("Disabling slave %s:%d, result differs from "
@ -4425,6 +4433,13 @@ static void handleError(ROUTER *instance, void *router_session,
CHK_DCB(problem_dcb); CHK_DCB(problem_dcb);
if (!rses_begin_locked_router_action(rses))
{
/** Session is already closed */
*succp = false;
return;
}
/** Don't handle same error twice on same DCB */ /** Don't handle same error twice on same DCB */
if (problem_dcb->dcb_errhandle_called) if (problem_dcb->dcb_errhandle_called)
{ {
@ -4434,6 +4449,7 @@ static void handleError(ROUTER *instance, void *router_session,
* be safe with the code as it stands on 9 Sept 2015 - MNB * be safe with the code as it stands on 9 Sept 2015 - MNB
*/ */
*succp = true; *succp = true;
rses_end_locked_router_action(rses);
return; return;
} }
else else
@ -4443,6 +4459,7 @@ static void handleError(ROUTER *instance, void *router_session,
session = problem_dcb->session; session = problem_dcb->session;
bool close_dcb = true; bool close_dcb = true;
backend_ref_t *bref = get_bref_from_dcb(rses, problem_dcb);
if (session == NULL || rses == NULL) if (session == NULL || rses == NULL)
{ {
@ -4461,17 +4478,6 @@ static void handleError(ROUTER *instance, void *router_session,
{ {
case ERRACT_NEW_CONNECTION: case ERRACT_NEW_CONNECTION:
{ {
if (!rses_begin_locked_router_action(rses))
{
close_dcb = false; /* With the assumption that if the router session is closed,
* then so is the dcb.
*/
*succp = false;
break;
}
backend_ref_t *bref = get_bref_from_dcb(rses, problem_dcb);
/** /**
* If master has lost its Master status error can't be * If master has lost its Master status error can't be
* handled so that session could continue. * handled so that session could continue.
@ -4516,12 +4522,11 @@ static void handleError(ROUTER *instance, void *router_session,
"corresponding backend ref.", srv->name, srv->port); "corresponding backend ref.", srv->name, srv->port);
} }
} }
else else if (bref)
{ {
/** /** We should reconnect only if we find a backend for this
* This is called in hope of getting replacement for * DCB. If this DCB is an older DCB that has been closed,
* failed slave(s). * we can ignore it. */
*/
*succp = handle_error_new_connection(inst, &rses, problem_dcb, errmsgbuf); *succp = handle_error_new_connection(inst, &rses, problem_dcb, errmsgbuf);
} }
@ -4535,7 +4540,7 @@ static void handleError(ROUTER *instance, void *router_session,
if (bref && bref->bref_dcb == problem_dcb) if (bref && bref->bref_dcb == problem_dcb)
{ {
bref->closed_at = __LINE__; RW_CLOSE_BREF(bref);
} }
} }
else else
@ -4549,7 +4554,6 @@ static void handleError(ROUTER *instance, void *router_session,
} }
close_dcb = false; close_dcb = false;
rses_end_locked_router_action(rses);
break; break;
} }
@ -4570,14 +4574,11 @@ static void handleError(ROUTER *instance, void *router_session,
if (close_dcb) if (close_dcb)
{ {
backend_ref_t *bref = get_bref_from_dcb(rses, problem_dcb);
RW_CHK_DCB(bref, problem_dcb); RW_CHK_DCB(bref, problem_dcb);
dcb_close(problem_dcb); dcb_close(problem_dcb);
if (bref) RW_CLOSE_BREF(bref);
{
bref->closed_at = __LINE__;
}
} }
rses_end_locked_router_action(rses);
} }
static void handle_error_reply_client(SESSION *ses, ROUTER_CLIENT_SES *rses, static void handle_error_reply_client(SESSION *ses, ROUTER_CLIENT_SES *rses,
@ -4592,35 +4593,22 @@ static void handle_error_reply_client(SESSION *ses, ROUTER_CLIENT_SES *rses,
client_dcb = ses->client_dcb; client_dcb = ses->client_dcb;
spinlock_release(&ses->ses_lock); spinlock_release(&ses->ses_lock);
if (rses_begin_locked_router_action(rses)) if ((bref = get_bref_from_dcb(rses, backend_dcb)) != NULL)
{ {
/** CHK_BACKEND_REF(bref);
* If bref exists, mark it closed
*/
if ((bref = get_bref_from_dcb(rses, backend_dcb)) != NULL)
{
CHK_BACKEND_REF(bref);
if (BREF_IS_IN_USE(bref)) if (BREF_IS_IN_USE(bref))
{
close_failed_bref(bref, false);
RW_CHK_DCB(bref, backend_dcb);
dcb_close(backend_dcb);
bref->closed_at = __LINE__;
}
}
else
{ {
// All dcbs should be associated with a backend reference. close_failed_bref(bref, false);
ss_dassert(!true); RW_CHK_DCB(bref, backend_dcb);
dcb_close(backend_dcb);
RW_CLOSE_BREF(bref);
} }
rses_end_locked_router_action(rses);
} }
else else
{ {
// The session has already been closed, hence the dcb has been // All dcbs should be associated with a backend reference.
// closed as well. ss_dassert(!true);
} }
if (sesstate == SESSION_STATE_ROUTER_READY) if (sesstate == SESSION_STATE_ROUTER_READY)