This commit is contained in:
Marko
2018-06-05 11:37:18 +03:00
146 changed files with 540 additions and 199 deletions

View File

@ -579,7 +579,7 @@ static int ini_handler(void *userdata, const char *section, const char *name, co
if (strcmp(section, CN_GATEWAY) == 0 || strcasecmp(section, CN_MAXSCALE) == 0)
{
if (is_root_config_file)
if (is_root_config_file || is_persisted_config)
{
return handle_global_item(name, value);
}

View File

@ -482,7 +482,7 @@ bool modulecmd_cb(const MODULECMD *cmd, void *data)
std::string s = d->domain;
s += "/";
s += cmd->identifier;
ss_dassert(strcmp(d->domain, cmd->domain) == 0);
ss_dassert(strcasecmp(d->domain, cmd->domain) == 0);
json_object_set_new(obj, CN_LINKS, mxs_json_self_link(d->host, CN_MODULES, s.c_str()));
json_object_set_new(attr, CN_PARAMETERS, param);

View File

@ -389,6 +389,9 @@ const char* qc_op_to_string(qc_query_op_t op)
case QUERY_OP_LOAD:
return "QUERY_OP_LOAD";
case QUERY_OP_LOAD_LOCAL:
return "QUERY_OP_LOAD_LOCAL";
case QUERY_OP_REVOKE:
return "QUERY_OP_REVOKE";

View File

@ -85,6 +85,7 @@ static inline fw_op_t qc_op_to_fw_op(qc_query_op_t op)
case QUERY_OP_INSERT:
return FW_OP_INSERT;
case QUERY_OP_LOAD_LOCAL:
case QUERY_OP_LOAD:
return FW_OP_LOAD;
@ -288,4 +289,4 @@ char* create_error(const char* format, ...);
*/
bool rule_matches(Dbfw* my_instance, DbfwSession* my_session,
GWBUF *queue, SRule rule, char* query);
bool rule_is_active(SRule rule);
bool rule_is_active(SRule rule);

View File

@ -588,6 +588,10 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES;
uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES;
// Precaution to prevent writing too much in case new events are added
int real_len = MXS_MIN(n_events, (int)sizeof(router->event_type_hdr_lens));
memcpy(router->event_type_hdr_lens, ptr + BLRM_FDE_EVENT_TYPES_OFFSET, real_len);
router->event_types = n_events;
router->binlog_checksum = checksum[0];
}

View File

@ -344,7 +344,7 @@ newSession(MXS_ROUTER *instance, MXS_SESSION *session)
continue;
}
if (ref == master_host && (inst->bitvalue & SERVER_MASTER))
if (ref == master_host && inst->bitvalue == SERVER_MASTER)
{
/* If option is "master" return only the root Master as there could be
* intermediate masters (Relay Servers) and they must not be selected.

View File

@ -321,7 +321,7 @@ void RWSplitSession::correct_packet_sequence(GWBUF *buffer)
}
}
static void log_unexpected_response(DCB* dcb, GWBUF* buffer)
static void log_unexpected_response(SRWBackend& backend, GWBUF* buffer, GWBUF* current_query)
{
if (mxs_mysql_is_err_packet(buffer))
{
@ -336,21 +336,23 @@ static void log_unexpected_response(DCB* dcb, GWBUF* buffer)
if (errcode == ER_CONNECTION_KILLED)
{
MXS_INFO("Connection from '%s'@'%s' to '%s' was killed",
dcb->session->client_dcb->user,
dcb->session->client_dcb->remote,
dcb->server->name);
backend->dcb()->session->client_dcb->user,
backend->dcb()->session->client_dcb->remote,
backend->name());
}
else
{
MXS_WARNING("Server '%s' sent an unexpected error: %hu, %s",
dcb->server->name, errcode, errstr.c_str());
backend->name(), errcode, errstr.c_str());
}
}
else
{
std::string sql = current_query ? mxs::extract_sql(current_query, 1024) : "<not available>";
MXS_ERROR("Unexpected internal state: received response 0x%02hhx from "
"server '%s' when no response was expected",
mxs_mysql_get_command(buffer), dcb->server->name);
"server '%s' when no response was expected. Command: 0x%02hhx "
"Query: %s", mxs_mysql_get_command(buffer), backend->name(),
backend->current_command(), sql.c_str());
ss_dassert(false);
}
}
@ -385,39 +387,57 @@ GWBUF* RWSplitSession::handle_causal_read_reply(GWBUF *writebuf, SRWBackend& bac
void RWSplitSession::handle_trx_replay()
{
if (m_replayed_trx.empty())
{
// No more statements to execute
m_is_replay_active = false;
atomic_add_uint64(&m_router->stats().n_trx_replay, 1);
// Check that the checksums match.
SHA1Checksum chksum = m_trx.checksum();
chksum.finalize();
if (chksum == m_replayed_trx.checksum())
{
MXS_INFO("Checksums match, replay successful.");
if (m_interrupted_query.get())
{
MXS_INFO("Resuming execution: %s", mxs::extract_sql(m_interrupted_query.get()).c_str());
retry_query(m_interrupted_query.release(), 0);
}
}
else
{
MXS_INFO("Checksum mismatch, transaction replay failed. Closing connection.");
poll_fake_hangup_event(m_client);
}
}
else
if (m_replayed_trx.have_stmts())
{
// More statements to replay, pop the oldest one and execute it
GWBUF* buf = m_replayed_trx.pop_stmt();
MXS_INFO("Replaying: %s", mxs::extract_sql(buf, 1024).c_str());
retry_query(buf, 0);
}
else
{
// No more statements to execute
m_is_replay_active = false;
atomic_add_uint64(&m_router->stats().n_trx_replay, 1);
if (!m_replayed_trx.empty())
{
// Check that the checksums match.
SHA1Checksum chksum = m_trx.checksum();
chksum.finalize();
if (chksum == m_replayed_trx.checksum())
{
MXS_INFO("Checksums match, replay successful.");
if (m_interrupted_query.get())
{
MXS_INFO("Resuming execution: %s", mxs::extract_sql(m_interrupted_query.get()).c_str());
retry_query(m_interrupted_query.release(), 0);
}
}
else
{
MXS_INFO("Checksum mismatch, transaction replay failed. Closing connection.");
modutil_send_mysql_err_packet(m_client, 0, 0, 1927, "08S01",
"Transaction checksum mismatch encountered "
"when replaying transaction.");
poll_fake_hangup_event(m_client);
}
}
else
{
/**
* The transaction was "empty". This means that the start of the transaction
* did not finish before we started the replay process.
*
* The transaction that is being currently replayed has a result,
* whereas the original interrupted transaction had none. Due to this,
* the checksums would not match if they were to be compared.
*/
ss_info_dassert(!m_interrupted_query.get(), "Interrupted query should be empty");
}
}
}
void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
@ -431,7 +451,7 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
/** If we receive an unexpected response from the server, the internal
* logic cannot handle this situation. Routing the reply straight to
* the client should be the safest thing to do at this point. */
log_unexpected_response(backend_dcb, writebuf);
log_unexpected_response(backend, writebuf, m_current_query.get());
MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
return;
}
@ -499,9 +519,24 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
ss_dassert(m_config.transaction_replay);
handle_trx_replay();
// Ignore the response, the client doesn't need it
gwbuf_free(writebuf);
return;
/**
* If the start of the transaction was interrupted, we need to return
* the result to the client.
*
* This retrying of START TRANSACTION is done with the transaction replay
* mechanism instead of the normal query retry mechanism because the safeguards
* in the routing logic prevent retrying of individual queries inside transactions.
*
* If the transaction was not empty and some results have already been
* sent to the client, we must discard all responses that the client already has.
*/
if (!m_replayed_trx.empty())
{
// Client already has this response, discard it
gwbuf_free(writebuf);
return;
}
}
else if (m_config.transaction_replay && session_trx_is_ending(m_client->session))
{
@ -564,23 +599,47 @@ bool RWSplitSession::start_trx_replay()
if (!m_is_replay_active && m_config.transaction_replay && m_can_replay_trx)
{
// Stash any interrupted queries while we replay the transaction
m_interrupted_query.reset(m_current_query.release());
if (m_trx.have_stmts() || m_current_query.get())
{
// Stash any interrupted queries while we replay the transaction
m_interrupted_query.reset(m_current_query.release());
MXS_INFO("Starting transaction replay");
m_is_replay_active = true;
MXS_INFO("Starting transaction replay");
m_is_replay_active = true;
/**
* Copy the transaction for replaying and finalize it. This
* allows the checksums to be compared. The current transaction
* is closed as the replaying opens a new transaction.
*/
m_replayed_trx = m_trx;
m_replayed_trx.finalize();
m_trx.close();
/**
* Copy the transaction for replaying and finalize it. This
* allows the checksums to be compared. The current transaction
* is closed as the replaying opens a new transaction.
*/
m_replayed_trx = m_trx;
m_replayed_trx.finalize();
m_trx.close();
if (m_replayed_trx.have_stmts())
{
// Pop the first statement and start replaying the transaction
retry_query(m_replayed_trx.pop_stmt(), 0);
}
else
{
/**
* The transaction was only opened and no queries have been
* executed. The buffer should contain a query that starts
* a transaction.
*/
ss_info_dassert(qc_get_trx_type_mask(m_interrupted_query.get()) & QUERY_TYPE_BEGIN_TRX,
"The current query should start a transaction");
retry_query(m_interrupted_query.release(), 0);
}
}
else
{
ss_info_dassert(!session_is_autocommit(m_client->session),
"Session should have autocommit disabled if the transaction "
"had no statements and no query was interrupted");
}
// Pop the first statement and start replaying the transaction
retry_query(m_replayed_trx.pop_stmt(), 0);
rval = true;
}

View File

@ -83,13 +83,27 @@ public:
}
/**
* Check if transaction is empty
* Check if transaction has statements
*
* @return True if transaction has no statements
* @return True if transaction has statements
*
* @note This function should only be used when checking whether a transaction
* that is being replayed has ended. The empty() method can be used
* to check whether statements were added to the transaction.
*/
bool have_stmts() const
{
return !m_log.empty();
}
/**
* Check whether the transaction is empty
*
* @return True if no statements have been added to the transaction
*/
bool empty() const
{
return m_log.empty();
return m_size == 0;
}
/**

View File

@ -424,7 +424,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
/** We know where to route this query */
SSRBackend bref = get_bref_from_dcb(target_dcb);
if (op == QUERY_OP_LOAD)
if (op == QUERY_OP_LOAD_LOCAL)
{
m_load_target = bref->backend()->server;
}