Merge branch '2.0' into develop

This commit is contained in:
Markus Makela
2016-12-05 21:19:59 +02:00
18 changed files with 215 additions and 55 deletions

View File

@ -78,7 +78,7 @@ avro_client_handle_request(AVRO_INSTANCE *router, AVRO_CLIENT *client, GWBUF *qu
if (avro_client_do_registration(router, client, queue) == 0)
{
client->state = AVRO_CLIENT_ERRORED;
dcb_printf(client->dcb, "ERR, code 12, msg: Registration failed");
dcb_printf(client->dcb, "ERR, code 12, msg: Registration failed\n");
/* force disconnection */
dcb_close(client->dcb);
rval = 0;
@ -86,7 +86,7 @@ avro_client_handle_request(AVRO_INSTANCE *router, AVRO_CLIENT *client, GWBUF *qu
else
{
/* Send OK ack to client */
dcb_printf(client->dcb, "OK");
dcb_printf(client->dcb, "OK\n");
client->state = AVRO_CLIENT_REGISTERED;
MXS_INFO("%s: Client [%s] has completed REGISTRATION action",
@ -503,7 +503,7 @@ avro_client_process_command(AVRO_INSTANCE *router, AVRO_CLIENT *client, GWBUF *q
{
GWBUF *reply = gwbuf_alloc(5);
memcpy(GWBUF_DATA(reply), "ECHO:", 5);
reply = gwbuf_append(reply, queue);
reply = gwbuf_append(reply, gwbuf_clone(queue));
client->dcb->func.write(client->dcb, reply);
}
}
@ -556,11 +556,15 @@ const char* get_avrofile_name(const char *file_ptr, int data_len, char *dest)
static int send_row(DCB *dcb, json_t* row)
{
char *json = json_dumps(row, JSON_PRESERVE_ORDER);
GWBUF *buf;
size_t len = strlen(json);
GWBUF *buf = gwbuf_alloc(len + 1);
int rc = 0;
if (json && (buf = gwbuf_alloc_and_load(strlen(json), (void*)json)))
if (json && buf)
{
uint8_t *data = GWBUF_DATA(buf);
memcpy(data, json, len);
data[len] = '\n';
rc = dcb->func.write(dcb, buf);
}
else

View File

@ -433,10 +433,10 @@ blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint32_t size,
router->binlog_name,
strerror_r(errno, err_msg, sizeof(err_msg)));
/* Remove any partial event that was written */
if (ftruncate(router->binlog_fd, router->last_written))
if (ftruncate(router->binlog_fd, router->binlog_position))
{
MXS_ERROR("%s: Failed to truncate binlog record at %lu of %s, %s. ",
router->service->name, router->last_written,
router->service->name, router->binlog_position,
router->binlog_name,
strerror_r(errno, err_msg, sizeof(err_msg)));
}

View File

@ -398,9 +398,20 @@ static void closeSession(ROUTER *instance, void *router_session)
}
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED);
dcb_close(dcb);
/** Decrease server reference connection count */
RW_CHK_DCB(bref, dcb);
/** MXS-956: This will prevent closed DCBs from being closed twice.
* It should not happen but for currently unknown reasons, a DCB
* gets closed twice; first in handleError and a second time here. */
if (dcb && dcb->state == DCB_STATE_POLLING)
{
dcb_close(dcb);
}
RW_CLOSE_BREF(bref);
/** decrease server current connection counters */
atomic_add(&bref->ref->connections, -1);
}
else
@ -1282,6 +1293,13 @@ static void handleError(ROUTER *instance, void *router_session,
CHK_DCB(problem_dcb);
if (!rses_begin_locked_router_action(rses))
{
/** Session is already closed */
*succp = false;
return;
}
/** Don't handle same error twice on same DCB */
if (problem_dcb->dcb_errhandle_called)
{
@ -1291,6 +1309,7 @@ static void handleError(ROUTER *instance, void *router_session,
* be safe with the code as it stands on 9 Sept 2015 - MNB
*/
*succp = true;
rses_end_locked_router_action(rses);
return;
}
else
@ -1300,6 +1319,7 @@ static void handleError(ROUTER *instance, void *router_session,
session = problem_dcb->session;
bool close_dcb = true;
backend_ref_t *bref = get_bref_from_dcb(rses, problem_dcb);
if (session == NULL || rses == NULL)
{
@ -1318,15 +1338,6 @@ static void handleError(ROUTER *instance, void *router_session,
{
case ERRACT_NEW_CONNECTION:
{
if (!rses_begin_locked_router_action(rses))
{
close_dcb = false; /* With the assumption that if the router session is closed,
* then so is the dcb.
*/
*succp = false;
break;
}
/**
* If master has lost its Master status error can't be
* handled so that session could continue.
@ -1334,7 +1345,6 @@ static void handleError(ROUTER *instance, void *router_session,
if (rses->rses_master_ref && rses->rses_master_ref->bref_dcb == problem_dcb)
{
SERVER *srv = rses->rses_master_ref->ref->server;
backend_ref_t *bref = get_bref_from_dcb(rses, problem_dcb);
bool can_continue = false;
if (rses->rses_config.rw_master_failure_mode != RW_FAIL_INSTANTLY &&
@ -1372,18 +1382,56 @@ static void handleError(ROUTER *instance, void *router_session,
"corresponding backend ref.", srv->name, srv->port);
}
}
else
else if (bref)
{
/**
* This is called in hope of getting replacement for
* failed slave(s).
*/
/** We should reconnect only if we find a backend for this
* DCB. If this DCB is an older DCB that has been closed,
* we can ignore it. */
*succp = handle_error_new_connection(inst, &rses, problem_dcb, errmsgbuf);
}
dcb_close(problem_dcb);
RW_CHK_DCB(bref, problem_dcb);
if (bref)
{
/** This is a valid DCB for a backend ref */
if (!BREF_IS_IN_USE(bref) || bref->bref_dcb != problem_dcb)
{
/** The backend is closed or the reference was replaced */
dcb_close(problem_dcb);
RW_CLOSE_BREF(bref);
}
else
{
MXS_ERROR("Backend '%s' is still in use and points to the problem DCB. Not closing.",
bref->ref->server->unique_name);
}
}
else
{
const char *remote = problem_dcb->state == DCB_STATE_POLLING &&
problem_dcb->server ? problem_dcb->server->unique_name : "CLOSED";
MXS_ERROR("DCB connected to '%s' is not in use by the router "
"session, not closing it. DCB is in state '%s'",
remote, STRDCBSTATE(problem_dcb->state));
MXS_ERROR("Backends currently in use:");
for (int i = 0; i < rses->rses_nbackends; i++)
{
dcb_state_t state = DCB_STATE_UNDEFINED;
if (BREF_IS_IN_USE(&rses->rses_backend_ref[i]))
{
state = rses->rses_backend_ref[i].bref_dcb->state;
}
MXS_ERROR("%p: %s - %p", &rses->rses_backend_ref[i], STRDCBSTATE(state),
rses->rses_backend_ref[i].bref_dcb);
}
}
close_dcb = false;
rses_end_locked_router_action(rses);
break;
}
@ -1404,8 +1452,11 @@ static void handleError(ROUTER *instance, void *router_session,
if (close_dcb)
{
RW_CHK_DCB(bref, problem_dcb);
dcb_close(problem_dcb);
RW_CLOSE_BREF(bref);
}
rses_end_locked_router_action(rses);
}
/**
@ -1428,33 +1479,22 @@ static void handle_error_reply_client(SESSION *ses, ROUTER_CLIENT_SES *rses,
client_dcb = ses->client_dcb;
spinlock_release(&ses->ses_lock);
if (rses_begin_locked_router_action(rses))
if ((bref = get_bref_from_dcb(rses, backend_dcb)) != NULL)
{
/**
* If bref exists, mark it closed
*/
if ((bref = get_bref_from_dcb(rses, backend_dcb)) != NULL)
{
CHK_BACKEND_REF(bref);
CHK_BACKEND_REF(bref);
if (BREF_IS_IN_USE(bref))
{
close_failed_bref(bref, false);
dcb_close(backend_dcb);
}
}
else
if (BREF_IS_IN_USE(bref))
{
// All dcbs should be associated with a backend reference.
ss_dassert(!true);
close_failed_bref(bref, false);
RW_CHK_DCB(bref, backend_dcb);
dcb_close(backend_dcb);
RW_CLOSE_BREF(bref);
}
rses_end_locked_router_action(rses);
}
else
{
// The session has already been closed, hence the dcb has been
// closed as well.
// All dcbs should be associated with a backend reference.
ss_dassert(!true);
}
if (sesstate == SESSION_STATE_ROUTER_READY)

View File

@ -200,6 +200,7 @@ typedef struct backend_ref_st
#if defined(SS_DEBUG)
skygw_chk_t bref_chk_tail;
#endif
int closed_at; /** DEBUG: Line number where this backend reference was closed */
} backend_ref_t;
/**

View File

@ -30,7 +30,17 @@ MXS_BEGIN_DECLS
* rwsplit_tmp_table_multi functions
*/
#include <maxscale/protocol/mysql.h>
#define RW_CHK_DCB(bref, dcb) \
do{ \
if(dcb->state == DCB_STATE_DISCONNECTED){ \
MXS_NOTICE("DCB was closed on line %d and another attempt to close it is made on line %d." , \
(bref) ? (bref)->closed_at : -1, __LINE__); \
} \
}while (false)
#define RW_CLOSE_BREF(b) do{ if (bref){ bref->closed_at = __LINE__; } } while (false)
/*
* The following are implemented in rwsplit_mysql.c
*/

View File

@ -301,7 +301,9 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref,
/** Decrease backend's connection counter. */
atomic_add(&backend_ref[i].ref->connections, -1);
RW_CHK_DCB(&backend_ref[i], backend_ref[i].bref_dcb);
dcb_close(backend_ref[i].bref_dcb);
RW_CLOSE_BREF(&backend_ref[i]);
}
}
}
@ -411,6 +413,7 @@ static bool connect_server(backend_ref_t *bref, SESSION *session, bool execute_h
if (bref->bref_dcb != NULL)
{
bref_clear_state(bref, BREF_CLOSED);
bref->closed_at = 0;
if (!execute_history || execute_sescmd_history(bref))
{
@ -429,7 +432,9 @@ static bool connect_server(backend_ref_t *bref, SESSION *session, bool execute_h
bref->ref->server->unique_name,
bref->ref->server->name,
bref->ref->server->port);
RW_CHK_DCB(bref, bref->bref_dcb);
dcb_close(bref->bref_dcb);
RW_CLOSE_BREF(bref);
bref->bref_dcb = NULL;
}
}

View File

@ -176,10 +176,9 @@ GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf,
bref->ref->server->unique_name);
close_failed_bref(bref, true);
if (bref->bref_dcb)
{
dcb_close(bref->bref_dcb);
}
RW_CHK_DCB(bref, bref->bref_dcb);
dcb_close(bref->bref_dcb);
RW_CLOSE_BREF(bref);
*reconnect = true;
gwbuf_free(replybuf);
replybuf = NULL;
@ -217,7 +216,9 @@ GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf,
if (ses->rses_backend_ref[i].bref_dcb)
{
RW_CHK_DCB(&ses->rses_backend_ref[i], ses->rses_backend_ref[i].bref_dcb);
dcb_close(ses->rses_backend_ref[i].bref_dcb);
RW_CLOSE_BREF(&ses->rses_backend_ref[i]);
}
*reconnect = true;
MXS_INFO("Disabling slave %s:%d, result differs from "