When protocol closes DCB it calls dcb_close instead of dcb->func.close. dcb_close then calls dcb->func.close. This is now changed to all protocols and routers.

Rwsplit handles ERRACT_NEW_CONNECTION by clearing backend reference, removing callbacks and associating backend reference with new backend server. If it succeeds and the router session can continue, handleError returns true. Otherwise false. When ever false is returned it means that session must be closed.

Rwsplit now tolerates backend failures in a way that it searches new backends when monitor, backend, or client operation fails due to backend failure.
This commit is contained in:
VilhoRaatikka
2014-06-15 23:44:07 +03:00
parent 09d20d1059
commit 5bcae64538
16 changed files with 472 additions and 433 deletions

View File

@ -82,13 +82,13 @@ static void clientReply(
GWBUF* queue,
DCB* backend_dcb);
static void handleError(
ROUTER* instance,
void* router_session,
char* message,
DCB* backend_dcb,
int action,
bool* succp);
static void handleError(
ROUTER* instance,
void* router_session,
GWBUF* errmsgbuf,
DCB* backend_dcb,
error_action_t action,
bool* succp);
static void print_error_packet(ROUTER_CLIENT_SES* rses, GWBUF* buf, DCB* dcb);
static int router_get_servercount(ROUTER_INSTANCE* router);
@ -179,9 +179,15 @@ static void rses_property_done(
static mysql_sescmd_t* rses_property_get_sescmd(
rses_property_t* prop);
static bool execute_sescmd_history(backend_ref_t* bref);
static bool execute_sescmd_in_backend(
backend_ref_t* backend_ref);
static void sescmd_cursor_reset(sescmd_cursor_t* scur);
static bool sescmd_cursor_history_empty(sescmd_cursor_t* scur);
static void sescmd_cursor_set_active(
sescmd_cursor_t* sescmd_cursor,
bool value);
@ -581,7 +587,10 @@ static void* newSession(
/**
* Find a backend servers to connect to.
* This command requires that rsession's lock is held.
*/
rses_begin_locked_router_action(client_rses);
succp = select_connect_backend_servers(&master_ref,
backend_ref,
router_nservers,
@ -589,6 +598,8 @@ static void* newSession(
client_rses->rses_config.rw_slave_select_criteria,
session,
router);
rses_end_locked_router_action(client_rses);
/** Both Master and at least 1 slave must be found */
if (!succp) {
@ -689,14 +700,10 @@ static void closeSession(
CHK_DCB(dcb);
bref_clear_state(&backend_ref[i], BREF_IN_USE);
bref_set_state(&backend_ref[i], BREF_CLOSED);
#if defined(ERRHANDLE)
/**
* closes protocol and dcb
*/
dcb_close(dcb);
#else
dcb->func.close(dcb);
#endif
/** decrease server current connection counters */
atomic_add(&backend_ref[i].bref_backend->backend_server->stats.n_current, -1);
atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1);
@ -868,6 +875,12 @@ return_succp:
* @param queue Gateway buffer queue with the packets received
*
* @return if succeed 1, otherwise 0
* If routeQuery fails, it means that router session has failed.
* In any tolerated failure, handleError is called and if necessary,
* an error message is sent to the client.
*
* For now, routeQuery don't tolerate errors, so any error will close
* the session. vraa 14.6.14
*/
static int routeQuery(
ROUTER* instance,
@ -1466,12 +1479,27 @@ static bool select_connect_backend_servers(
if (*p_master_ref != NULL &&
BREF_IS_IN_USE((*p_master_ref)))
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [select_connect_backend_servers] Master %p fd %d found.",
pthread_self(),
(*p_master_ref)->bref_dcb,
(*p_master_ref)->bref_dcb->fd)));
master_found = true;
master_connected = true;
}
/** New session or master failure case */
else
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [select_connect_backend_servers] Didn't find master ",
"for session %p rses %p.",
pthread_self(),
session,
backend_ref)));
master_found = false;
master_connected = false;
}
@ -1598,6 +1626,15 @@ static bool select_connect_backend_servers(
if (backend_ref[i].bref_dcb != NULL)
{
slaves_connected += 1;
/**
* Start executing session command
* history.
*/
execute_sescmd_history(&backend_ref[i]);
/**
* Callback which is called when
* node fails.
*/
dcb_add_callback(
backend_ref[i].bref_dcb,
DCB_REASON_NOT_RESPONDING,
@ -1641,6 +1678,13 @@ static bool select_connect_backend_servers(
if (backend_ref[i].bref_dcb != NULL)
{
master_connected = true;
dcb_add_callback(
backend_ref[i].bref_dcb,
DCB_REASON_NOT_RESPONDING,
&router_handle_state_switch,
(void *)&backend_ref[i]);
bref_clear_state(&backend_ref[i],
BREF_NOT_USED);
bref_set_state(&backend_ref[i],
@ -2131,6 +2175,61 @@ static GWBUF* sescmd_cursor_clone_querybuf(
return buf;
}
static bool sescmd_cursor_history_empty(
sescmd_cursor_t* scur)
{
bool succp;
CHK_SESCMD_CUR(scur);
if (scur->scmd_cur_rses->rses_properties[RSES_PROP_TYPE_SESCMD] == NULL)
{
succp = true;
}
else
{
succp = false;
}
return succp;
}
static void sescmd_cursor_reset(
sescmd_cursor_t* scur)
{
CHK_SESCMD_CUR(scur);
CHK_CLIENT_RSES(scur->scmd_cur_rses);
CHK_RSES_PROP((*scur->scmd_cur_ptr_property));
scur->scmd_cur_active = false;
scur->scmd_cur_cmd = &(*scur->scmd_cur_ptr_property)->rses_prop_data.sescmd;
}
static bool execute_sescmd_history(
backend_ref_t* bref)
{
bool succp;
sescmd_cursor_t* scur;
CHK_BACKEND_REF(bref);
scur = &bref->bref_sescmd_cur;
CHK_SESCMD_CUR(scur);
if (!sescmd_cursor_history_empty(scur))
{
sescmd_cursor_reset(scur);
succp = execute_sescmd_in_backend(bref);
}
else
{
succp = true;
}
return succp;
}
/**
* If session command cursor is passive, sends the command to backend for
* execution.
@ -2540,9 +2639,9 @@ static void rwsplit_process_options(
}
/**
* Error Handler routine
*
* The routine will handle errors that occurred in backend writes.
* Error Handler routine to resolve backend failures. If it succeeds then there
* are enough operative backends available and connected. Otherwise it fails,
* and session is terminated.
*
* @param instance The router instance
* @param router_session The router session
@ -2555,27 +2654,74 @@ static void rwsplit_process_options(
* tell whether router has enough master/slave connections to continue work.
*/
static void handleError (
ROUTER *instance,
void *router_session,
char *message,
DCB *backend_dcb,
int action,
bool *succp)
ROUTER* instance,
void* router_session,
GWBUF* errmsgbuf,
DCB* backend_dcb,
error_action_t action,
bool* succp)
{
DCB* client_dcb = NULL;
SESSION* session = backend_dcb->session;
DCB* client_dcb;
SESSION* session;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session;
client_dcb = session->client;
CHK_DCB(client_dcb);
CHK_DCB(backend_dcb);
backend_dcb->dcb_errhandle_called = true;
session = backend_dcb->session;
CHK_SESSION(session);
switch (action) {
case ERRACT_NEW_CONNECTION:
{
int router_nservers;
int max_nslaves;
int router_nservers;
int max_nslaves;
backend_ref_t* bref;
CHK_CLIENT_RSES(rses);
if (!rses_begin_locked_router_action(rses))
{
*succp = false;
return;
}
/**
* Error handler is already called for this DCB because
* it's not polling anymore. It can be assumed that
* it succeed because rses isn't closed.
*/
if (backend_dcb->state != DCB_STATE_POLLING)
{
rses_end_locked_router_action(rses);
*succp = true;
return;
}
bref = get_bref_from_dcb(rses, backend_dcb);
CHK_BACKEND_REF(bref);
if (BREF_IS_WAITING_RESULT(bref))
{
DCB* client_dcb;
client_dcb = session->client;
client_dcb->func.write(client_dcb, errmsgbuf);
}
bref_clear_state(bref, BREF_IN_USE);
bref_clear_state(bref, BREF_WAITING_RESULT);
bref_set_state(bref, BREF_NOT_USED);
bref_set_state(bref, BREF_CLOSED);
/**
* Remove callback because this DCB won't be used
* unless it is reconnected later, and then the callback
* is set again.
*/
dcb_remove_callback(backend_dcb,
DCB_REASON_NOT_RESPONDING,
&router_handle_state_switch,
(void *)bref);
router_nservers = router_get_servercount(inst);
max_nslaves = rses_get_max_slavecount(rses, router_nservers);
/**
@ -2590,45 +2736,33 @@ static void handleError (
rses->rses_config.rw_slave_select_criteria,
session,
inst);
ss_dassert(*succp);
/** Too few or no slaves at all */
if (!succp)
{
if (session->state == SESSION_STATE_ROUTER_READY)
{
ROUTER* rsession;
ROUTER_OBJECT* router;
router = session->service->router;
spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
rsession = session->router_session;
/*<
* rsession should never be NULL here.
*/
ss_dassert(rsession != NULL);
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [gw_error_backend_event] "
"Call closeSession for backend "
"session.",
pthread_self())));
router->closeSession(instance, rsession);
}
}
}
rses_end_locked_router_action(rses);
break;
}
case ERRACT_REPLY_CLIENT:
{
session_state_t sesstate;
spinlock_acquire(&session->ses_lock);
sesstate = session->state;
client_dcb = session->client;
spinlock_release(&session->ses_lock);
if (sesstate == SESSION_STATE_ROUTER_READY)
{
CHK_DCB(client_dcb);
client_dcb->func.write(client_dcb, errmsgbuf);
}
succp = false; /** false because new servers aren's selected. */
break;
}
default:
*succp = false;
break;
}
}
}
static void print_error_packet(
@ -2727,19 +2861,25 @@ static backend_ref_t* get_bref_from_dcb(
DCB* dcb)
{
backend_ref_t* bref;
int i = 0;
CHK_DCB(dcb);
CHK_CLIENT_RSES(rses);
bref = rses->rses_backend_ref;
while (bref != NULL)
while (i<rses->rses_nbackends)
{
if (bref->bref_dcb == dcb)
{
break;
}
bref++;
i += 1;
}
if (i == rses->rses_nbackends)
{
bref = NULL;
}
return bref;
}
@ -2749,30 +2889,37 @@ static int router_handle_state_switch(
DCB_REASON reason,
void* data)
{
backend_ref_t* bref;
int rc = 1;
backend_ref_t* bref;
int rc = 1;
ROUTER_CLIENT_SES* rses;
SESSION* ses;
SERVER* srv;
CHK_DCB(dcb);
bref = (backend_ref_t *)data;
CHK_BACKEND_REF(bref);
if (bref->bref_dcb != dcb)
srv = bref->bref_backend->backend_server;
if (SERVER_IS_RUNNING(srv) && SERVER_IS_IN_CLUSTER(srv))
{
goto return_rc;
}
ses = dcb->session;
CHK_SESSION(ses);
rses = (ROUTER_CLIENT_SES *)dcb->session->router_session;
CHK_CLIENT_RSES(rses);
switch (reason) {
case DCB_REASON_NOT_RESPONDING:
if (BREF_IS_WAITING_RESULT(bref))
{
dcb->func.hangup(dcb);
}
dcb->func.hangup(dcb);
break;
default:
break;
}
return_rc:
return rc;
}
}