Rename readwritesplit member variables
Prefixed the variables with the `m_` prefix and changed some of them to be more descriptive.
This commit is contained in:
@ -114,11 +114,11 @@ route_target_t get_route_target(MXS_SESSION* session,
|
|||||||
void
|
void
|
||||||
log_transaction_status(RWSplitSession *rses, GWBUF *querybuf, uint32_t qtype)
|
log_transaction_status(RWSplitSession *rses, GWBUF *querybuf, uint32_t qtype)
|
||||||
{
|
{
|
||||||
if (rses->large_query)
|
if (rses->m_large_query)
|
||||||
{
|
{
|
||||||
MXS_INFO("> Processing large request with more than 2^24 bytes of data");
|
MXS_INFO("> Processing large request with more than 2^24 bytes of data");
|
||||||
}
|
}
|
||||||
else if (rses->load_data_state == LOAD_DATA_INACTIVE)
|
else if (rses->m_load_data_state == LOAD_DATA_INACTIVE)
|
||||||
{
|
{
|
||||||
uint8_t *packet = GWBUF_DATA(querybuf);
|
uint8_t *packet = GWBUF_DATA(querybuf);
|
||||||
unsigned char command = packet[4];
|
unsigned char command = packet[4];
|
||||||
@ -135,7 +135,7 @@ log_transaction_status(RWSplitSession *rses, GWBUF *querybuf, uint32_t qtype)
|
|||||||
len = RWSPLIT_TRACE_MSG_LEN;
|
len = RWSPLIT_TRACE_MSG_LEN;
|
||||||
}
|
}
|
||||||
|
|
||||||
MXS_SESSION *ses = rses->client_dcb->session;
|
MXS_SESSION *ses = rses->m_client->session;
|
||||||
const char *autocommit = session_is_autocommit(ses) ? "[enabled]" : "[disabled]";
|
const char *autocommit = session_is_autocommit(ses) ? "[enabled]" : "[disabled]";
|
||||||
const char *transaction = session_trx_is_active(ses) ? "[open]" : "[not open]";
|
const char *transaction = session_trx_is_active(ses) ? "[open]" : "[not open]";
|
||||||
uint32_t plen = MYSQL_GET_PACKET_LEN(querybuf);
|
uint32_t plen = MYSQL_GET_PACKET_LEN(querybuf);
|
||||||
@ -152,7 +152,7 @@ log_transaction_status(RWSplitSession *rses, GWBUF *querybuf, uint32_t qtype)
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
MXS_INFO("> Processing LOAD DATA LOCAL INFILE: %lu bytes sent.",
|
MXS_INFO("> Processing LOAD DATA LOCAL INFILE: %lu bytes sent.",
|
||||||
rses->rses_load_data_sent);
|
rses->m_load_data_sent);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -232,23 +232,23 @@ void check_create_tmp_table(RWSplitSession *router_cli_ses,
|
|||||||
{
|
{
|
||||||
if (qc_query_is_type(type, QUERY_TYPE_CREATE_TMP_TABLE))
|
if (qc_query_is_type(type, QUERY_TYPE_CREATE_TMP_TABLE))
|
||||||
{
|
{
|
||||||
ss_dassert(router_cli_ses && querybuf && router_cli_ses->client_dcb &&
|
ss_dassert(router_cli_ses && querybuf && router_cli_ses->m_client &&
|
||||||
router_cli_ses->client_dcb->data);
|
router_cli_ses->m_client->data);
|
||||||
|
|
||||||
router_cli_ses->have_tmp_tables = true;
|
router_cli_ses->m_have_tmp_tables = true;
|
||||||
char* tblname = qc_get_created_table_name(querybuf);
|
char* tblname = qc_get_created_table_name(querybuf);
|
||||||
std::string table;
|
std::string table;
|
||||||
|
|
||||||
if (tblname && *tblname && strchr(tblname, '.') == NULL)
|
if (tblname && *tblname && strchr(tblname, '.') == NULL)
|
||||||
{
|
{
|
||||||
const char* db = mxs_mysql_get_current_db(router_cli_ses->client_dcb->session);
|
const char* db = mxs_mysql_get_current_db(router_cli_ses->m_client->session);
|
||||||
table += db;
|
table += db;
|
||||||
table += ".";
|
table += ".";
|
||||||
table += tblname;
|
table += tblname;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Add the table to the set of temporary tables */
|
/** Add the table to the set of temporary tables */
|
||||||
router_cli_ses->temp_tables.insert(table);
|
router_cli_ses->m_temp_tables.insert(table);
|
||||||
|
|
||||||
MXS_FREE(tblname);
|
MXS_FREE(tblname);
|
||||||
}
|
}
|
||||||
@ -259,7 +259,7 @@ void check_create_tmp_table(RWSplitSession *router_cli_ses,
|
|||||||
*/
|
*/
|
||||||
bool find_table(RWSplitSession* rses, const std::string& table)
|
bool find_table(RWSplitSession* rses, const std::string& table)
|
||||||
{
|
{
|
||||||
if (rses->temp_tables.find(table) != rses->temp_tables.end())
|
if (rses->m_temp_tables.find(table) != rses->m_temp_tables.end())
|
||||||
{
|
{
|
||||||
MXS_INFO("Query targets a temporary table: %s", table.c_str());
|
MXS_INFO("Query targets a temporary table: %s", table.c_str());
|
||||||
return false;
|
return false;
|
||||||
@ -286,7 +286,7 @@ static bool foreach_table(RWSplitSession* rses, GWBUF* querybuf, bool (*func)(RW
|
|||||||
|
|
||||||
for (int i = 0; i < n_tables; i++)
|
for (int i = 0; i < n_tables; i++)
|
||||||
{
|
{
|
||||||
const char* db = mxs_mysql_get_current_db(rses->client_dcb->session);
|
const char* db = mxs_mysql_get_current_db(rses->m_client->session);
|
||||||
std::string table;
|
std::string table;
|
||||||
|
|
||||||
if (strchr(tables[i], '.') == NULL)
|
if (strchr(tables[i], '.') == NULL)
|
||||||
@ -318,7 +318,7 @@ bool is_read_tmp_table(RWSplitSession *rses,
|
|||||||
GWBUF *querybuf,
|
GWBUF *querybuf,
|
||||||
uint32_t qtype)
|
uint32_t qtype)
|
||||||
{
|
{
|
||||||
ss_dassert(rses && querybuf && rses->client_dcb);
|
ss_dassert(rses && querybuf && rses->m_client);
|
||||||
bool rval = false;
|
bool rval = false;
|
||||||
|
|
||||||
if (qc_query_is_type(qtype, QUERY_TYPE_READ) ||
|
if (qc_query_is_type(qtype, QUERY_TYPE_READ) ||
|
||||||
@ -341,7 +341,7 @@ bool is_read_tmp_table(RWSplitSession *rses,
|
|||||||
*/
|
*/
|
||||||
bool delete_table(RWSplitSession *rses, const std::string& table)
|
bool delete_table(RWSplitSession *rses, const std::string& table)
|
||||||
{
|
{
|
||||||
rses->temp_tables.erase(table);
|
rses->m_temp_tables.erase(table);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -465,13 +465,13 @@ handle_multi_temp_and_load(RWSplitSession *rses, GWBUF *querybuf,
|
|||||||
* effective since we don't have a node to force queries to. In this
|
* effective since we don't have a node to force queries to. In this
|
||||||
* situation, assigning QUERY_TYPE_WRITE for the query will trigger
|
* situation, assigning QUERY_TYPE_WRITE for the query will trigger
|
||||||
* the error processing. */
|
* the error processing. */
|
||||||
if ((rses->target_node == NULL || rses->target_node != rses->current_master) &&
|
if ((rses->m_target_node == NULL || rses->m_target_node != rses->m_current_master) &&
|
||||||
(check_for_multi_stmt(querybuf, rses->client_dcb->protocol, packet_type) ||
|
(check_for_multi_stmt(querybuf, rses->m_client->protocol, packet_type) ||
|
||||||
check_for_sp_call(querybuf, packet_type)))
|
check_for_sp_call(querybuf, packet_type)))
|
||||||
{
|
{
|
||||||
if (rses->current_master && rses->current_master->in_use())
|
if (rses->m_current_master && rses->m_current_master->in_use())
|
||||||
{
|
{
|
||||||
rses->target_node = rses->current_master;
|
rses->m_target_node = rses->m_current_master;
|
||||||
MXS_INFO("Multi-statement query or stored procedure call, routing "
|
MXS_INFO("Multi-statement query or stored procedure call, routing "
|
||||||
"all future queries to master.");
|
"all future queries to master.");
|
||||||
}
|
}
|
||||||
@ -486,7 +486,7 @@ handle_multi_temp_and_load(RWSplitSession *rses, GWBUF *querybuf,
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
if (rses == NULL || querybuf == NULL ||
|
if (rses == NULL || querybuf == NULL ||
|
||||||
rses->client_dcb == NULL || rses->client_dcb->data == NULL)
|
rses->m_client == NULL || rses->m_client->data == NULL)
|
||||||
{
|
{
|
||||||
if (rses == NULL || querybuf == NULL)
|
if (rses == NULL || querybuf == NULL)
|
||||||
{
|
{
|
||||||
@ -494,12 +494,12 @@ handle_multi_temp_and_load(RWSplitSession *rses, GWBUF *querybuf,
|
|||||||
rses, querybuf);
|
rses, querybuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rses->client_dcb == NULL)
|
if (rses->m_client == NULL)
|
||||||
{
|
{
|
||||||
MXS_ERROR("[%s] Error: Client DCB is NULL.", __FUNCTION__);
|
MXS_ERROR("[%s] Error: Client DCB is NULL.", __FUNCTION__);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rses->client_dcb->data == NULL)
|
if (rses->m_client->data == NULL)
|
||||||
{
|
{
|
||||||
MXS_ERROR("[%s] Error: User data in master server DBC is NULL.",
|
MXS_ERROR("[%s] Error: User data in master server DBC is NULL.",
|
||||||
__FUNCTION__);
|
__FUNCTION__);
|
||||||
@ -511,7 +511,7 @@ handle_multi_temp_and_load(RWSplitSession *rses, GWBUF *querybuf,
|
|||||||
/**
|
/**
|
||||||
* Check if the query has anything to do with temporary tables.
|
* Check if the query has anything to do with temporary tables.
|
||||||
*/
|
*/
|
||||||
if (rses->have_tmp_tables && is_packet_a_query(packet_type))
|
if (rses->m_have_tmp_tables && is_packet_a_query(packet_type))
|
||||||
{
|
{
|
||||||
check_drop_tmp_table(rses, querybuf);
|
check_drop_tmp_table(rses, querybuf);
|
||||||
if (is_read_tmp_table(rses, querybuf, *qtype))
|
if (is_read_tmp_table(rses, querybuf, *qtype))
|
||||||
@ -526,17 +526,17 @@ handle_multi_temp_and_load(RWSplitSession *rses, GWBUF *querybuf,
|
|||||||
* Check if this is a LOAD DATA LOCAL INFILE query. If so, send all queries
|
* Check if this is a LOAD DATA LOCAL INFILE query. If so, send all queries
|
||||||
* to the master until the last, empty packet arrives.
|
* to the master until the last, empty packet arrives.
|
||||||
*/
|
*/
|
||||||
if (rses->load_data_state == LOAD_DATA_ACTIVE)
|
if (rses->m_load_data_state == LOAD_DATA_ACTIVE)
|
||||||
{
|
{
|
||||||
rses->rses_load_data_sent += gwbuf_length(querybuf);
|
rses->m_load_data_sent += gwbuf_length(querybuf);
|
||||||
}
|
}
|
||||||
else if (is_packet_a_query(packet_type))
|
else if (is_packet_a_query(packet_type))
|
||||||
{
|
{
|
||||||
qc_query_op_t queryop = qc_get_operation(querybuf);
|
qc_query_op_t queryop = qc_get_operation(querybuf);
|
||||||
if (queryop == QUERY_OP_LOAD)
|
if (queryop == QUERY_OP_LOAD)
|
||||||
{
|
{
|
||||||
rses->load_data_state = LOAD_DATA_START;
|
rses->m_load_data_state = LOAD_DATA_START;
|
||||||
rses->rses_load_data_sent = 0;
|
rses->m_load_data_sent = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -556,7 +556,7 @@ route_target_t get_target_type(RWSplitSession *rses, GWBUF *buffer,
|
|||||||
uint8_t* command, uint32_t* type, uint32_t* stmt_id)
|
uint8_t* command, uint32_t* type, uint32_t* stmt_id)
|
||||||
{
|
{
|
||||||
route_target_t route_target = TARGET_MASTER;
|
route_target_t route_target = TARGET_MASTER;
|
||||||
bool in_read_only_trx = rses->target_node && session_trx_is_read_only(rses->client_dcb->session);
|
bool in_read_only_trx = rses->m_target_node && session_trx_is_read_only(rses->m_client->session);
|
||||||
|
|
||||||
if (gwbuf_length(buffer) > MYSQL_HEADER_LEN)
|
if (gwbuf_length(buffer) > MYSQL_HEADER_LEN)
|
||||||
{
|
{
|
||||||
@ -599,7 +599,7 @@ route_target_t get_target_type(RWSplitSession *rses, GWBUF *buffer,
|
|||||||
* eventually to master
|
* eventually to master
|
||||||
*/
|
*/
|
||||||
|
|
||||||
if (rses->target_node && rses->target_node == rses->current_master)
|
if (rses->m_target_node && rses->m_target_node == rses->m_current_master)
|
||||||
{
|
{
|
||||||
/** The session is locked to the master */
|
/** The session is locked to the master */
|
||||||
route_target = TARGET_MASTER;
|
route_target = TARGET_MASTER;
|
||||||
@ -617,17 +617,17 @@ route_target_t get_target_type(RWSplitSession *rses, GWBUF *buffer,
|
|||||||
qc_get_operation(buffer) == QUERY_OP_EXECUTE)
|
qc_get_operation(buffer) == QUERY_OP_EXECUTE)
|
||||||
{
|
{
|
||||||
std::string id = get_text_ps_id(buffer);
|
std::string id = get_text_ps_id(buffer);
|
||||||
*type = rses->ps_manager.get_type(id);
|
*type = rses->m_ps_manager.get_type(id);
|
||||||
}
|
}
|
||||||
else if (mxs_mysql_is_ps_command(*command))
|
else if (mxs_mysql_is_ps_command(*command))
|
||||||
{
|
{
|
||||||
*stmt_id = get_internal_ps_id(rses, buffer);
|
*stmt_id = get_internal_ps_id(rses, buffer);
|
||||||
*type = rses->ps_manager.get_type(*stmt_id);
|
*type = rses->m_ps_manager.get_type(*stmt_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
MXS_SESSION* session = rses->client_dcb->session;
|
MXS_SESSION* session = rses->m_client->session;
|
||||||
mxs_target_t use_sql_variables_in = rses->rses_config.use_sql_variables_in;
|
mxs_target_t use_sql_variables_in = rses->m_config.use_sql_variables_in;
|
||||||
bool load_active = (rses->load_data_state != LOAD_DATA_INACTIVE);
|
bool load_active = (rses->m_load_data_state != LOAD_DATA_INACTIVE);
|
||||||
|
|
||||||
route_target = get_route_target(session,
|
route_target = get_route_target(session,
|
||||||
use_sql_variables_in,
|
use_sql_variables_in,
|
||||||
@ -638,9 +638,9 @@ route_target_t get_target_type(RWSplitSession *rses, GWBUF *buffer,
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
/** Empty packet signals end of LOAD DATA LOCAL INFILE, send it to master*/
|
/** Empty packet signals end of LOAD DATA LOCAL INFILE, send it to master*/
|
||||||
rses->load_data_state = LOAD_DATA_END;
|
rses->m_load_data_state = LOAD_DATA_END;
|
||||||
MXS_INFO("> LOAD DATA LOCAL INFILE finished: %lu bytes sent.",
|
MXS_INFO("> LOAD DATA LOCAL INFILE finished: %lu bytes sent.",
|
||||||
rses->rses_load_data_sent + gwbuf_length(buffer));
|
rses->m_load_data_sent + gwbuf_length(buffer));
|
||||||
}
|
}
|
||||||
|
|
||||||
return route_target;
|
return route_target;
|
||||||
|
|||||||
@ -104,7 +104,7 @@ bool RWSplitSession::handle_target_is_all(route_target_t route_target, GWBUF *qu
|
|||||||
|
|
||||||
if (errbuf)
|
if (errbuf)
|
||||||
{
|
{
|
||||||
client_dcb->func.write(client_dcb, errbuf);
|
m_client->func.write(m_client, errbuf);
|
||||||
result = true;
|
result = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -115,7 +115,7 @@ bool RWSplitSession::handle_target_is_all(route_target_t route_target, GWBUF *qu
|
|||||||
{
|
{
|
||||||
|
|
||||||
result = true;
|
result = true;
|
||||||
atomic_add_uint64(&router->stats().n_all, 1);
|
atomic_add_uint64(&m_router->stats().n_all, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
|||||||
@ -72,9 +72,9 @@ void RWSplitSession::handle_connection_keepalive(SRWBackend& target)
|
|||||||
ss_dassert(target);
|
ss_dassert(target);
|
||||||
ss_debug(int nserv = 0);
|
ss_debug(int nserv = 0);
|
||||||
/** Each heartbeat is 1/10th of a second */
|
/** Each heartbeat is 1/10th of a second */
|
||||||
int keepalive = rses_config.connection_keepalive * 10;
|
int keepalive = m_config.connection_keepalive * 10;
|
||||||
|
|
||||||
for (auto it = backends.begin(); it != backends.end(); it++)
|
for (auto it = m_backends.begin(); it != m_backends.end(); it++)
|
||||||
{
|
{
|
||||||
SRWBackend backend = *it;
|
SRWBackend backend = *it;
|
||||||
|
|
||||||
@ -92,7 +92,7 @@ void RWSplitSession::handle_connection_keepalive(SRWBackend& target)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ss_dassert(nserv < rses_nbackends);
|
ss_dassert(nserv < m_nbackends);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RWSplitSession::prepare_target(SRWBackend& target, route_target_t route_target)
|
bool RWSplitSession::prepare_target(SRWBackend& target, route_target_t route_target)
|
||||||
@ -103,17 +103,17 @@ bool RWSplitSession::prepare_target(SRWBackend& target, route_target_t route_tar
|
|||||||
if (!target->in_use() && target->can_connect())
|
if (!target->in_use() && target->can_connect())
|
||||||
{
|
{
|
||||||
if (TARGET_IS_SLAVE(route_target) ||
|
if (TARGET_IS_SLAVE(route_target) ||
|
||||||
(rses_config.master_reconnection && TARGET_IS_MASTER(route_target)))
|
(m_config.master_reconnection && TARGET_IS_MASTER(route_target)))
|
||||||
{
|
{
|
||||||
if ((!rses_config.disable_sescmd_history || recv_sescmd == 0))
|
if ((!m_config.disable_sescmd_history || m_recv_sescmd == 0))
|
||||||
{
|
{
|
||||||
rval = target->connect(client_dcb->session, &sescmd_list);
|
rval = target->connect(m_client->session, &m_sescmd_list);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
MXS_ERROR("Cannot reconnect to server '%s', session command"
|
MXS_ERROR("Cannot reconnect to server '%s', session command"
|
||||||
" history is disabled (session has executed"
|
" history is disabled (session has executed"
|
||||||
" %lu session commands).", target->name(), recv_sescmd);
|
" %lu session commands).", target->name(), m_recv_sescmd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (TARGET_IS_MASTER(route_target))
|
else if (TARGET_IS_MASTER(route_target))
|
||||||
@ -164,12 +164,12 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf, const RouteInfo& info)
|
|||||||
{
|
{
|
||||||
bool store_stmt = false;
|
bool store_stmt = false;
|
||||||
|
|
||||||
if (large_query)
|
if (m_large_query)
|
||||||
{
|
{
|
||||||
/** We're processing a large query that's split across multiple packets.
|
/** We're processing a large query that's split across multiple packets.
|
||||||
* Route it to the same backend where we routed the previous packet. */
|
* Route it to the same backend where we routed the previous packet. */
|
||||||
ss_dassert(prev_target);
|
ss_dassert(m_prev_target);
|
||||||
target = prev_target;
|
target = m_prev_target;
|
||||||
succp = true;
|
succp = true;
|
||||||
}
|
}
|
||||||
else if (TARGET_IS_NAMED_SERVER(route_target) || TARGET_IS_RLAG_MAX(route_target))
|
else if (TARGET_IS_NAMED_SERVER(route_target) || TARGET_IS_RLAG_MAX(route_target))
|
||||||
@ -189,19 +189,19 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf, const RouteInfo& info)
|
|||||||
if ((target = handle_slave_is_target(command, stmt_id)))
|
if ((target = handle_slave_is_target(command, stmt_id)))
|
||||||
{
|
{
|
||||||
succp = true;
|
succp = true;
|
||||||
store_stmt = rses_config.retry_failed_reads;
|
store_stmt = m_config.retry_failed_reads;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (TARGET_IS_MASTER(route_target))
|
else if (TARGET_IS_MASTER(route_target))
|
||||||
{
|
{
|
||||||
succp = handle_master_is_target(&target);
|
succp = handle_master_is_target(&target);
|
||||||
|
|
||||||
if (!rses_config.strict_multi_stmt &&
|
if (!m_config.strict_multi_stmt &&
|
||||||
!rses_config.strict_sp_calls &&
|
!m_config.strict_sp_calls &&
|
||||||
target_node == current_master)
|
m_target_node == m_current_master)
|
||||||
{
|
{
|
||||||
/** Reset the forced node as we're in relaxed multi-statement mode */
|
/** Reset the forced node as we're in relaxed multi-statement mode */
|
||||||
target_node.reset();
|
m_target_node.reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -215,8 +215,8 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf, const RouteInfo& info)
|
|||||||
else if (target->have_session_commands())
|
else if (target->have_session_commands())
|
||||||
{
|
{
|
||||||
// We need to wait until the session commands are executed
|
// We need to wait until the session commands are executed
|
||||||
expected_responses++;
|
m_expected_responses++;
|
||||||
query_queue = gwbuf_append(query_queue, gwbuf_clone(querybuf));
|
m_query_queue = gwbuf_append(m_query_queue, gwbuf_clone(querybuf));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -230,14 +230,14 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf, const RouteInfo& info)
|
|||||||
* information is used to route all COM_STMT_FETCH commands
|
* information is used to route all COM_STMT_FETCH commands
|
||||||
* to the same server where the COM_STMT_EXECUTE was done. */
|
* to the same server where the COM_STMT_EXECUTE was done. */
|
||||||
ss_dassert(stmt_id > 0);
|
ss_dassert(stmt_id > 0);
|
||||||
exec_map[stmt_id] = target;
|
m_exec_map[stmt_id] = target;
|
||||||
MXS_INFO("COM_STMT_EXECUTE on %s", target->uri());
|
MXS_INFO("COM_STMT_EXECUTE on %s", target->uri());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (succp && router->config().connection_keepalive &&
|
if (succp && m_router->config().connection_keepalive &&
|
||||||
(TARGET_IS_SLAVE(route_target) || TARGET_IS_MASTER(route_target)))
|
(TARGET_IS_SLAVE(route_target) || TARGET_IS_MASTER(route_target)))
|
||||||
{
|
{
|
||||||
handle_connection_keepalive(target);
|
handle_connection_keepalive(target);
|
||||||
@ -273,21 +273,21 @@ void RWSplitSession::purge_history(mxs::SSessionCommand& sescmd)
|
|||||||
// As the PS handles map to explicit IDs, we must retain all COM_STMT_PREPARE commands
|
// As the PS handles map to explicit IDs, we must retain all COM_STMT_PREPARE commands
|
||||||
if (sescmd->get_command() != MXS_COM_STMT_PREPARE)
|
if (sescmd->get_command() != MXS_COM_STMT_PREPARE)
|
||||||
{
|
{
|
||||||
auto first = std::find_if(sescmd_list.begin(), sescmd_list.end(),
|
auto first = std::find_if(m_sescmd_list.begin(), m_sescmd_list.end(),
|
||||||
mxs::equal_pointees(sescmd));
|
mxs::equal_pointees(sescmd));
|
||||||
|
|
||||||
if (first != sescmd_list.end())
|
if (first != m_sescmd_list.end())
|
||||||
{
|
{
|
||||||
// We have at least one of these commands. See if we have a second one
|
// We have at least one of these commands. See if we have a second one
|
||||||
auto second = std::find_if(std::next(first), sescmd_list.end(),
|
auto second = std::find_if(std::next(first), m_sescmd_list.end(),
|
||||||
mxs::equal_pointees(sescmd));
|
mxs::equal_pointees(sescmd));
|
||||||
|
|
||||||
if (second != sescmd_list.end())
|
if (second != m_sescmd_list.end())
|
||||||
{
|
{
|
||||||
// We have a total of three commands, remove the middle one
|
// We have a total of three commands, remove the middle one
|
||||||
auto old_cmd = *second;
|
auto old_cmd = *second;
|
||||||
sescmd_responses.erase(old_cmd->get_position());
|
m_sescmd_responses.erase(old_cmd->get_position());
|
||||||
sescmd_list.erase(second);
|
m_sescmd_list.erase(second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -315,7 +315,7 @@ void RWSplitSession::purge_history(mxs::SSessionCommand& sescmd)
|
|||||||
bool RWSplitSession::route_session_write(GWBUF *querybuf, uint8_t command, uint32_t type)
|
bool RWSplitSession::route_session_write(GWBUF *querybuf, uint8_t command, uint32_t type)
|
||||||
{
|
{
|
||||||
/** The SessionCommand takes ownership of the buffer */
|
/** The SessionCommand takes ownership of the buffer */
|
||||||
uint64_t id = sescmd_count++;
|
uint64_t id = m_sescmd_count++;
|
||||||
mxs::SSessionCommand sescmd(new mxs::SessionCommand(querybuf, id));
|
mxs::SSessionCommand sescmd(new mxs::SessionCommand(querybuf, id));
|
||||||
bool expecting_response = mxs_mysql_command_will_respond(command);
|
bool expecting_response = mxs_mysql_command_will_respond(command);
|
||||||
int nsucc = 0;
|
int nsucc = 0;
|
||||||
@ -325,12 +325,12 @@ bool RWSplitSession::route_session_write(GWBUF *querybuf, uint8_t command, uint3
|
|||||||
qc_query_is_type(type, QUERY_TYPE_PREPARE_STMT))
|
qc_query_is_type(type, QUERY_TYPE_PREPARE_STMT))
|
||||||
{
|
{
|
||||||
gwbuf_set_type(querybuf, GWBUF_TYPE_COLLECT_RESULT);
|
gwbuf_set_type(querybuf, GWBUF_TYPE_COLLECT_RESULT);
|
||||||
ps_manager.store(querybuf, id);
|
m_ps_manager.store(querybuf, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
MXS_INFO("Session write, routing to all servers.");
|
MXS_INFO("Session write, routing to all servers.");
|
||||||
|
|
||||||
for (auto it = backends.begin(); it != backends.end(); it++)
|
for (auto it = m_backends.begin(); it != m_backends.end(); it++)
|
||||||
{
|
{
|
||||||
SRWBackend& backend = *it;
|
SRWBackend& backend = *it;
|
||||||
|
|
||||||
@ -351,7 +351,7 @@ bool RWSplitSession::route_session_write(GWBUF *querybuf, uint8_t command, uint3
|
|||||||
|
|
||||||
if (expecting_response)
|
if (expecting_response)
|
||||||
{
|
{
|
||||||
expected_responses++;
|
m_expected_responses++;
|
||||||
}
|
}
|
||||||
|
|
||||||
MXS_INFO("Route query to %s \t%s",
|
MXS_INFO("Route query to %s \t%s",
|
||||||
@ -365,7 +365,7 @@ bool RWSplitSession::route_session_write(GWBUF *querybuf, uint8_t command, uint3
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rses_config.max_sescmd_history > 0 && sescmd_list.size() >= rses_config.max_sescmd_history)
|
if (m_config.max_sescmd_history > 0 && m_sescmd_list.size() >= m_config.max_sescmd_history)
|
||||||
{
|
{
|
||||||
static bool warn_history_exceeded = true;
|
static bool warn_history_exceeded = true;
|
||||||
if (warn_history_exceeded)
|
if (warn_history_exceeded)
|
||||||
@ -377,40 +377,40 @@ bool RWSplitSession::route_session_write(GWBUF *querybuf, uint8_t command, uint3
|
|||||||
"command history, add `disable_sescmd_history=true` to "
|
"command history, add `disable_sescmd_history=true` to "
|
||||||
"service '%s'. To increase the limit (currently %lu), add "
|
"service '%s'. To increase the limit (currently %lu), add "
|
||||||
"`max_sescmd_history` to the same service and increase the value.",
|
"`max_sescmd_history` to the same service and increase the value.",
|
||||||
router->service()->name, rses_config.max_sescmd_history);
|
m_router->service()->name, m_config.max_sescmd_history);
|
||||||
warn_history_exceeded = false;
|
warn_history_exceeded = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
rses_config.disable_sescmd_history = true;
|
m_config.disable_sescmd_history = true;
|
||||||
rses_config.max_sescmd_history = 0;
|
m_config.max_sescmd_history = 0;
|
||||||
sescmd_list.clear();
|
m_sescmd_list.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rses_config.disable_sescmd_history)
|
if (m_config.disable_sescmd_history)
|
||||||
{
|
{
|
||||||
/** Prune stored responses */
|
/** Prune stored responses */
|
||||||
ResponseMap::iterator it = sescmd_responses.lower_bound(lowest_pos);
|
ResponseMap::iterator it = m_sescmd_responses.lower_bound(lowest_pos);
|
||||||
|
|
||||||
if (it != sescmd_responses.end())
|
if (it != m_sescmd_responses.end())
|
||||||
{
|
{
|
||||||
sescmd_responses.erase(sescmd_responses.begin(), it);
|
m_sescmd_responses.erase(m_sescmd_responses.begin(), it);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
purge_history(sescmd);
|
purge_history(sescmd);
|
||||||
sescmd_list.push_back(sescmd);
|
m_sescmd_list.push_back(sescmd);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nsucc)
|
if (nsucc)
|
||||||
{
|
{
|
||||||
sent_sescmd = id;
|
m_sent_sescmd = id;
|
||||||
|
|
||||||
if (!expecting_response)
|
if (!expecting_response)
|
||||||
{
|
{
|
||||||
/** The command doesn't generate a response so we increment the
|
/** The command doesn't generate a response so we increment the
|
||||||
* completed session command count */
|
* completed session command count */
|
||||||
recv_sescmd++;
|
m_recv_sescmd++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -431,7 +431,7 @@ SRWBackend RWSplitSession::get_hinted_backend(char *name)
|
|||||||
{
|
{
|
||||||
SRWBackend rval;
|
SRWBackend rval;
|
||||||
|
|
||||||
for (auto it = backends.begin(); it != backends.end(); it++)
|
for (auto it = m_backends.begin(); it != m_backends.end(); it++)
|
||||||
{
|
{
|
||||||
auto& backend = *it;
|
auto& backend = *it;
|
||||||
|
|
||||||
@ -450,9 +450,9 @@ SRWBackend RWSplitSession::get_hinted_backend(char *name)
|
|||||||
SRWBackend RWSplitSession::get_slave_backend(int max_rlag)
|
SRWBackend RWSplitSession::get_slave_backend(int max_rlag)
|
||||||
{
|
{
|
||||||
SRWBackend rval;
|
SRWBackend rval;
|
||||||
auto counts = get_slave_counts(backends, current_master);
|
auto counts = get_slave_counts(m_backends, m_current_master);
|
||||||
|
|
||||||
for (auto it = backends.begin(); it != backends.end(); it++)
|
for (auto it = m_backends.begin(); it != m_backends.end(); it++)
|
||||||
{
|
{
|
||||||
auto& backend = *it;
|
auto& backend = *it;
|
||||||
|
|
||||||
@ -462,15 +462,15 @@ SRWBackend RWSplitSession::get_slave_backend(int max_rlag)
|
|||||||
if (!rval)
|
if (!rval)
|
||||||
{
|
{
|
||||||
// No previous candidate, accept any valid server (includes master)
|
// No previous candidate, accept any valid server (includes master)
|
||||||
if ((backend->is_master() && backend == current_master) ||
|
if ((backend->is_master() && backend == m_current_master) ||
|
||||||
backend->is_slave())
|
backend->is_slave())
|
||||||
{
|
{
|
||||||
rval = backend;
|
rval = backend;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (backend->in_use() || counts.second < router->max_slave_count())
|
else if (backend->in_use() || counts.second < m_router->max_slave_count())
|
||||||
{
|
{
|
||||||
if (!rses_config.master_accept_reads && rval->is_master())
|
if (!m_config.master_accept_reads && rval->is_master())
|
||||||
{
|
{
|
||||||
// Pick slaves over masters with master_accept_reads=false
|
// Pick slaves over masters with master_accept_reads=false
|
||||||
rval = backend;
|
rval = backend;
|
||||||
@ -478,7 +478,7 @@ SRWBackend RWSplitSession::get_slave_backend(int max_rlag)
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
// Compare the two servers and pick the best one
|
// Compare the two servers and pick the best one
|
||||||
rval = compare_backends(rval, backend, rses_config.slave_selection_criteria);
|
rval = compare_backends(rval, backend, m_config.slave_selection_criteria);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -491,7 +491,7 @@ SRWBackend RWSplitSession::get_master_backend()
|
|||||||
{
|
{
|
||||||
SRWBackend rval;
|
SRWBackend rval;
|
||||||
/** get root master from available servers */
|
/** get root master from available servers */
|
||||||
SRWBackend master = get_root_master(backends);
|
SRWBackend master = get_root_master(m_backends);
|
||||||
|
|
||||||
if (master)
|
if (master)
|
||||||
{
|
{
|
||||||
@ -532,10 +532,10 @@ SRWBackend RWSplitSession::get_target_backend(backend_type_t btype,
|
|||||||
char *name, int max_rlag)
|
char *name, int max_rlag)
|
||||||
{
|
{
|
||||||
/** Check whether using target_node as target SLAVE */
|
/** Check whether using target_node as target SLAVE */
|
||||||
if (target_node && session_trx_is_read_only(client_dcb->session))
|
if (m_target_node && session_trx_is_read_only(m_client->session))
|
||||||
{
|
{
|
||||||
MXS_DEBUG("In READ ONLY transaction, using server '%s'", target_node->name());
|
MXS_DEBUG("In READ ONLY transaction, using server '%s'", m_target_node->name());
|
||||||
return target_node;
|
return m_target_node;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRWBackend rval;
|
SRWBackend rval;
|
||||||
@ -570,9 +570,9 @@ int RWSplitSession::get_max_replication_lag()
|
|||||||
int conf_max_rlag;
|
int conf_max_rlag;
|
||||||
|
|
||||||
/** if there is no configured value, then longest possible int is used */
|
/** if there is no configured value, then longest possible int is used */
|
||||||
if (rses_config.max_slave_replication_lag > 0)
|
if (m_config.max_slave_replication_lag > 0)
|
||||||
{
|
{
|
||||||
conf_max_rlag = rses_config.max_slave_replication_lag;
|
conf_max_rlag = m_config.max_slave_replication_lag;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -678,9 +678,9 @@ SRWBackend RWSplitSession::handle_slave_is_target(uint8_t cmd, uint32_t stmt_id)
|
|||||||
{
|
{
|
||||||
/** The COM_STMT_FETCH must be executed on the same server as the
|
/** The COM_STMT_FETCH must be executed on the same server as the
|
||||||
* COM_STMT_EXECUTE was executed on */
|
* COM_STMT_EXECUTE was executed on */
|
||||||
ExecMap::iterator it = exec_map.find(stmt_id);
|
ExecMap::iterator it = m_exec_map.find(stmt_id);
|
||||||
|
|
||||||
if (it != exec_map.end())
|
if (it != m_exec_map.end())
|
||||||
{
|
{
|
||||||
target = it->second;
|
target = it->second;
|
||||||
MXS_INFO("COM_STMT_FETCH on %s", target->uri());
|
MXS_INFO("COM_STMT_FETCH on %s", target->uri());
|
||||||
@ -698,7 +698,7 @@ SRWBackend RWSplitSession::handle_slave_is_target(uint8_t cmd, uint32_t stmt_id)
|
|||||||
|
|
||||||
if (target)
|
if (target)
|
||||||
{
|
{
|
||||||
atomic_add_uint64(&router->stats().n_slave, 1);
|
atomic_add_uint64(&m_router->stats().n_slave, 1);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -728,7 +728,7 @@ void RWSplitSession::log_master_routing_failure(bool found,
|
|||||||
else if (old_master && curr_master && old_master->in_use())
|
else if (old_master && curr_master && old_master->in_use())
|
||||||
{
|
{
|
||||||
/** We found a master but it's not the same connection */
|
/** We found a master but it's not the same connection */
|
||||||
ss_dassert(!rses_config.master_reconnection);
|
ss_dassert(!m_config.master_reconnection);
|
||||||
ss_dassert(old_master != curr_master);
|
ss_dassert(old_master != curr_master);
|
||||||
sprintf(errmsg, "Master server changed from '%s' to '%s'",
|
sprintf(errmsg, "Master server changed from '%s' to '%s'",
|
||||||
old_master->name(), curr_master->name());
|
old_master->name(), curr_master->name());
|
||||||
@ -744,7 +744,7 @@ void RWSplitSession::log_master_routing_failure(bool found,
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
/** We never had a master connection, the session must be in read-only mode */
|
/** We never had a master connection, the session must be in read-only mode */
|
||||||
if (rses_config.master_failure_mode != RW_FAIL_INSTANTLY)
|
if (m_config.master_failure_mode != RW_FAIL_INSTANTLY)
|
||||||
{
|
{
|
||||||
sprintf(errmsg, "Session is in read-only mode because it was created "
|
sprintf(errmsg, "Session is in read-only mode because it was created "
|
||||||
"when no master was available");
|
"when no master was available");
|
||||||
@ -759,28 +759,28 @@ void RWSplitSession::log_master_routing_failure(bool found,
|
|||||||
}
|
}
|
||||||
|
|
||||||
MXS_WARNING("[%s] Write query received from %s@%s. %s. Closing client connection.",
|
MXS_WARNING("[%s] Write query received from %s@%s. %s. Closing client connection.",
|
||||||
router->service()->name, client_dcb->user,
|
m_router->service()->name, m_client->user,
|
||||||
client_dcb->remote, errmsg);
|
m_client->remote, errmsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RWSplitSession::should_replace_master(SRWBackend& target)
|
bool RWSplitSession::should_replace_master(SRWBackend& target)
|
||||||
{
|
{
|
||||||
return rses_config.master_reconnection &&
|
return m_config.master_reconnection &&
|
||||||
// We have a target server and it's not the current master
|
// We have a target server and it's not the current master
|
||||||
target && target != current_master &&
|
target && target != m_current_master &&
|
||||||
// We are not inside a transaction (also checks for autocommit=1)
|
// We are not inside a transaction (also checks for autocommit=1)
|
||||||
!session_trx_is_active(client_dcb->session) &&
|
!session_trx_is_active(m_client->session) &&
|
||||||
// We are not locked to the old master
|
// We are not locked to the old master
|
||||||
!locked_to_master();
|
!locked_to_master();
|
||||||
}
|
}
|
||||||
|
|
||||||
void RWSplitSession::replace_master(SRWBackend& target)
|
void RWSplitSession::replace_master(SRWBackend& target)
|
||||||
{
|
{
|
||||||
current_master = target;
|
m_current_master = target;
|
||||||
|
|
||||||
// As the master has changed, we can reset the temporary table information
|
// As the master has changed, we can reset the temporary table information
|
||||||
have_tmp_tables = false;
|
m_have_tmp_tables = false;
|
||||||
temp_tables.clear();
|
m_temp_tables.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -801,30 +801,30 @@ bool RWSplitSession::handle_master_is_target(SRWBackend* dest)
|
|||||||
|
|
||||||
if (should_replace_master(target))
|
if (should_replace_master(target))
|
||||||
{
|
{
|
||||||
MXS_INFO("Replacing old master '%s' with new master '%s'", current_master ?
|
MXS_INFO("Replacing old master '%s' with new master '%s'", m_current_master ?
|
||||||
current_master->name() : "<no previous master>", target->name());
|
m_current_master->name() : "<no previous master>", target->name());
|
||||||
replace_master(target);
|
replace_master(target);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (target && target == current_master)
|
if (target && target == m_current_master)
|
||||||
{
|
{
|
||||||
atomic_add_uint64(&router->stats().n_master, 1);
|
atomic_add_uint64(&m_router->stats().n_master, 1);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/** The original master is not available, we can't route the write */
|
/** The original master is not available, we can't route the write */
|
||||||
if (rses_config.master_failure_mode == RW_ERROR_ON_WRITE)
|
if (m_config.master_failure_mode == RW_ERROR_ON_WRITE)
|
||||||
{
|
{
|
||||||
succp = send_readonly_error(client_dcb);
|
succp = send_readonly_error(m_client);
|
||||||
|
|
||||||
if (current_master && current_master->in_use())
|
if (m_current_master && m_current_master->in_use())
|
||||||
{
|
{
|
||||||
current_master->close();
|
m_current_master->close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
log_master_routing_failure(succp, current_master, target);
|
log_master_routing_failure(succp, m_current_master, target);
|
||||||
succp = false;
|
succp = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -871,8 +871,8 @@ GWBUF* RWSplitSession::add_prefix_wait_gtid(SERVER *server, GWBUF *origin)
|
|||||||
GWBUF* rval = origin;
|
GWBUF* rval = origin;
|
||||||
const char* wait_func = (server->server_type == SERVER_TYPE_MARIADB) ?
|
const char* wait_func = (server->server_type == SERVER_TYPE_MARIADB) ?
|
||||||
MARIADB_WAIT_GTID_FUNC : MYSQL_WAIT_GTID_FUNC;
|
MARIADB_WAIT_GTID_FUNC : MYSQL_WAIT_GTID_FUNC;
|
||||||
const char *gtid_wait_timeout = router->config().causal_read_timeout.c_str();
|
const char *gtid_wait_timeout = m_router->config().causal_read_timeout.c_str();
|
||||||
const char *gtid_position = gtid_pos.c_str();
|
const char *gtid_position = m_gtid_pos.c_str();
|
||||||
|
|
||||||
/* Create a new buffer to store prefix sql */
|
/* Create a new buffer to store prefix sql */
|
||||||
size_t prefix_len = strlen(gtid_wait_stmt) + strlen(gtid_position) +
|
size_t prefix_len = strlen(gtid_wait_stmt) + strlen(gtid_position) +
|
||||||
@ -913,9 +913,9 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, SRWBackend& target, bool
|
|||||||
* If the transaction is READ ONLY set forced_node to this backend.
|
* If the transaction is READ ONLY set forced_node to this backend.
|
||||||
* This SLAVE backend will be used until the COMMIT is seen.
|
* This SLAVE backend will be used until the COMMIT is seen.
|
||||||
*/
|
*/
|
||||||
if (!target_node && session_trx_is_read_only(client_dcb->session))
|
if (!m_target_node && session_trx_is_read_only(m_client->session))
|
||||||
{
|
{
|
||||||
target_node = target;
|
m_target_node = target;
|
||||||
MXS_DEBUG("Setting forced_node SLAVE to %s within an opened READ ONLY transaction",
|
MXS_DEBUG("Setting forced_node SLAVE to %s within an opened READ ONLY transaction",
|
||||||
target->name());
|
target->name());
|
||||||
}
|
}
|
||||||
@ -927,16 +927,16 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, SRWBackend& target, bool
|
|||||||
ss_dassert(!target->have_session_commands());
|
ss_dassert(!target->have_session_commands());
|
||||||
|
|
||||||
mxs::Backend::response_type response = mxs::Backend::NO_RESPONSE;
|
mxs::Backend::response_type response = mxs::Backend::NO_RESPONSE;
|
||||||
wait_gtid_state = EXPECTING_NOTHING;
|
m_wait_gtid_state = EXPECTING_NOTHING;
|
||||||
uint8_t cmd = mxs_mysql_get_command(querybuf);
|
uint8_t cmd = mxs_mysql_get_command(querybuf);
|
||||||
GWBUF *send_buf = gwbuf_clone(querybuf);
|
GWBUF *send_buf = gwbuf_clone(querybuf);
|
||||||
if (cmd == COM_QUERY && router->config().enable_causal_read && gtid_pos != "")
|
if (cmd == COM_QUERY && m_router->config().enable_causal_read && m_gtid_pos != "")
|
||||||
{
|
{
|
||||||
send_buf = add_prefix_wait_gtid(target->server(), send_buf);
|
send_buf = add_prefix_wait_gtid(target->server(), send_buf);
|
||||||
wait_gtid_state = EXPECTING_WAIT_GTID_RESULT;
|
m_wait_gtid_state = EXPECTING_WAIT_GTID_RESULT;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (load_data_state != LOAD_DATA_ACTIVE &&
|
if (m_load_data_state != LOAD_DATA_ACTIVE &&
|
||||||
mxs_mysql_command_will_respond(cmd))
|
mxs_mysql_command_will_respond(cmd))
|
||||||
{
|
{
|
||||||
response = mxs::Backend::EXPECT_RESPONSE;
|
response = mxs::Backend::EXPECT_RESPONSE;
|
||||||
@ -946,55 +946,55 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, SRWBackend& target, bool
|
|||||||
|
|
||||||
if (target->write(send_buf, response))
|
if (target->write(send_buf, response))
|
||||||
{
|
{
|
||||||
if (store && !session_store_stmt(client_dcb->session, querybuf, target->server()))
|
if (store && !session_store_stmt(m_client->session, querybuf, target->server()))
|
||||||
{
|
{
|
||||||
MXS_ERROR("Failed to store current statement, it won't be retried if it fails.");
|
MXS_ERROR("Failed to store current statement, it won't be retried if it fails.");
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_add_uint64(&router->stats().n_queries, 1);
|
atomic_add_uint64(&m_router->stats().n_queries, 1);
|
||||||
|
|
||||||
if (!large_query && response == mxs::Backend::EXPECT_RESPONSE)
|
if (!large_query && response == mxs::Backend::EXPECT_RESPONSE)
|
||||||
{
|
{
|
||||||
/** The server will reply to this command */
|
/** The server will reply to this command */
|
||||||
ss_dassert(target->get_reply_state() == REPLY_STATE_DONE);
|
ss_dassert(target->get_reply_state() == REPLY_STATE_DONE);
|
||||||
target->set_reply_state(REPLY_STATE_START);
|
target->set_reply_state(REPLY_STATE_START);
|
||||||
expected_responses++;
|
m_expected_responses++;
|
||||||
|
|
||||||
if (load_data_state == LOAD_DATA_START)
|
if (m_load_data_state == LOAD_DATA_START)
|
||||||
{
|
{
|
||||||
/** The first packet contains the actual query and the server
|
/** The first packet contains the actual query and the server
|
||||||
* will respond to it */
|
* will respond to it */
|
||||||
load_data_state = LOAD_DATA_ACTIVE;
|
m_load_data_state = LOAD_DATA_ACTIVE;
|
||||||
}
|
}
|
||||||
else if (load_data_state == LOAD_DATA_END)
|
else if (m_load_data_state == LOAD_DATA_END)
|
||||||
{
|
{
|
||||||
/** The final packet in a LOAD DATA LOCAL INFILE is an empty packet
|
/** The final packet in a LOAD DATA LOCAL INFILE is an empty packet
|
||||||
* to which the server responds with an OK or an ERR packet */
|
* to which the server responds with an OK or an ERR packet */
|
||||||
ss_dassert(gwbuf_length(querybuf) == 4);
|
ss_dassert(gwbuf_length(querybuf) == 4);
|
||||||
load_data_state = LOAD_DATA_INACTIVE;
|
m_load_data_state = LOAD_DATA_INACTIVE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((this->large_query = large_query))
|
if ((this->m_large_query = large_query))
|
||||||
{
|
{
|
||||||
/** Store the previous target as we're processing a multi-packet query */
|
/** Store the previous target as we're processing a multi-packet query */
|
||||||
prev_target = target;
|
m_prev_target = target;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/** Otherwise reset it so we know the query is complete */
|
/** Otherwise reset it so we know the query is complete */
|
||||||
prev_target.reset();
|
m_prev_target.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If a READ ONLY transaction is ending set forced_node to NULL
|
* If a READ ONLY transaction is ending set forced_node to NULL
|
||||||
*/
|
*/
|
||||||
if (target_node &&
|
if (m_target_node &&
|
||||||
session_trx_is_read_only(client_dcb->session) &&
|
session_trx_is_read_only(m_client->session) &&
|
||||||
session_trx_is_ending(client_dcb->session))
|
session_trx_is_ending(m_client->session))
|
||||||
{
|
{
|
||||||
MXS_DEBUG("An opened READ ONLY transaction ends: forced_node is set to NULL");
|
MXS_DEBUG("An opened READ ONLY transaction ends: forced_node is set to NULL");
|
||||||
target_node.reset();
|
m_target_node.reset();
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -89,18 +89,18 @@ void RWSplitSession::process_sescmd_response(SRWBackend& backend, GWBUF** ppPack
|
|||||||
backend->add_ps_handle(id, resp.id);
|
backend->add_ps_handle(id, resp.id);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (recv_sescmd < sent_sescmd && id == recv_sescmd + 1)
|
if (m_recv_sescmd < m_sent_sescmd && id == m_recv_sescmd + 1)
|
||||||
{
|
{
|
||||||
if (!current_master || !current_master->in_use() || // Session doesn't have a master
|
if (!m_current_master || !m_current_master->in_use() || // Session doesn't have a master
|
||||||
current_master == backend) // This is the master's response
|
m_current_master == backend) // This is the master's response
|
||||||
{
|
{
|
||||||
/** First reply to this session command, route it to the client */
|
/** First reply to this session command, route it to the client */
|
||||||
++recv_sescmd;
|
++m_recv_sescmd;
|
||||||
discard = false;
|
discard = false;
|
||||||
|
|
||||||
/** Store the master's response so that the slave responses can
|
/** Store the master's response so that the slave responses can
|
||||||
* be compared to it */
|
* be compared to it */
|
||||||
sescmd_responses[id] = cmd;
|
m_sescmd_responses[id] = cmd;
|
||||||
|
|
||||||
if (cmd == MYSQL_REPLY_ERR)
|
if (cmd == MYSQL_REPLY_ERR)
|
||||||
{
|
{
|
||||||
@ -111,28 +111,28 @@ void RWSplitSession::process_sescmd_response(SRWBackend& backend, GWBUF** ppPack
|
|||||||
{
|
{
|
||||||
/** Map the returned response to the internal ID */
|
/** Map the returned response to the internal ID */
|
||||||
MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id);
|
MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id);
|
||||||
ps_handles[resp.id] = id;
|
m_ps_handles[resp.id] = id;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Discard any slave connections that did not return the same result
|
// Discard any slave connections that did not return the same result
|
||||||
for (SlaveResponseList::iterator it = slave_responses.begin();
|
for (SlaveResponseList::iterator it = m_slave_responses.begin();
|
||||||
it != slave_responses.end(); it++)
|
it != m_slave_responses.end(); it++)
|
||||||
{
|
{
|
||||||
discard_if_response_differs(it->first, cmd, it->second);
|
discard_if_response_differs(it->first, cmd, it->second);
|
||||||
}
|
}
|
||||||
|
|
||||||
slave_responses.clear();
|
m_slave_responses.clear();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/** Record slave command so that the response can be validated
|
/** Record slave command so that the response can be validated
|
||||||
* against the master's response when it arrives. */
|
* against the master's response when it arrives. */
|
||||||
slave_responses.push_back(std::make_pair(backend, cmd));
|
m_slave_responses.push_back(std::make_pair(backend, cmd));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
discard_if_response_differs(backend, sescmd_responses[id], cmd);
|
discard_if_response_differs(backend, m_sescmd_responses[id], cmd);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (discard)
|
if (discard)
|
||||||
|
|||||||
@ -22,31 +22,31 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
|||||||
const SRWBackendList& backends,
|
const SRWBackendList& backends,
|
||||||
const SRWBackend& master):
|
const SRWBackend& master):
|
||||||
mxs::RouterSession(session),
|
mxs::RouterSession(session),
|
||||||
backends(backends),
|
m_backends(backends),
|
||||||
current_master(master),
|
m_current_master(master),
|
||||||
large_query(false),
|
m_large_query(false),
|
||||||
rses_config(instance->config()),
|
m_config(instance->config()),
|
||||||
rses_nbackends(instance->service()->n_dbref),
|
m_nbackends(instance->service()->n_dbref),
|
||||||
load_data_state(LOAD_DATA_INACTIVE),
|
m_load_data_state(LOAD_DATA_INACTIVE),
|
||||||
have_tmp_tables(false),
|
m_have_tmp_tables(false),
|
||||||
rses_load_data_sent(0),
|
m_load_data_sent(0),
|
||||||
client_dcb(session->client_dcb),
|
m_client(session->client_dcb),
|
||||||
sescmd_count(1), // Needs to be a positive number to work
|
m_sescmd_count(1), // Needs to be a positive number to work
|
||||||
expected_responses(0),
|
m_expected_responses(0),
|
||||||
query_queue(NULL),
|
m_query_queue(NULL),
|
||||||
router(instance),
|
m_router(instance),
|
||||||
sent_sescmd(0),
|
m_sent_sescmd(0),
|
||||||
recv_sescmd(0),
|
m_recv_sescmd(0),
|
||||||
gtid_pos(""),
|
m_gtid_pos(""),
|
||||||
wait_gtid_state(EXPECTING_NOTHING),
|
m_wait_gtid_state(EXPECTING_NOTHING),
|
||||||
next_seq(0)
|
m_next_seq(0)
|
||||||
{
|
{
|
||||||
if (rses_config.rw_max_slave_conn_percent)
|
if (m_config.rw_max_slave_conn_percent)
|
||||||
{
|
{
|
||||||
int n_conn = 0;
|
int n_conn = 0;
|
||||||
double pct = (double)rses_config.rw_max_slave_conn_percent / 100.0;
|
double pct = (double)m_config.rw_max_slave_conn_percent / 100.0;
|
||||||
n_conn = MXS_MAX(floor((double)rses_nbackends * pct), 1);
|
n_conn = MXS_MAX(floor((double)m_nbackends * pct), 1);
|
||||||
rses_config.max_slave_connections = n_conn;
|
m_config.max_slave_connections = n_conn;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -93,15 +93,15 @@ void close_all_connections(SRWBackendList& backends)
|
|||||||
|
|
||||||
void RWSplitSession::close()
|
void RWSplitSession::close()
|
||||||
{
|
{
|
||||||
close_all_connections(backends);
|
close_all_connections(m_backends);
|
||||||
|
|
||||||
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO) &&
|
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO) &&
|
||||||
sescmd_list.size())
|
m_sescmd_list.size())
|
||||||
{
|
{
|
||||||
std::string sescmdstr;
|
std::string sescmdstr;
|
||||||
|
|
||||||
for (mxs::SessionCommandList::iterator it = sescmd_list.begin();
|
for (mxs::SessionCommandList::iterator it = m_sescmd_list.begin();
|
||||||
it != sescmd_list.end(); it++)
|
it != m_sescmd_list.end(); it++)
|
||||||
{
|
{
|
||||||
mxs::SSessionCommand& scmd = *it;
|
mxs::SSessionCommand& scmd = *it;
|
||||||
sescmdstr += scmd->to_string();
|
sescmdstr += scmd->to_string();
|
||||||
@ -116,11 +116,11 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
|
|||||||
{
|
{
|
||||||
int rval = 0;
|
int rval = 0;
|
||||||
|
|
||||||
if (query_queue == NULL &&
|
if (m_query_queue == NULL &&
|
||||||
(expected_responses == 0 ||
|
(m_expected_responses == 0 ||
|
||||||
mxs_mysql_get_command(querybuf) == MXS_COM_STMT_FETCH ||
|
mxs_mysql_get_command(querybuf) == MXS_COM_STMT_FETCH ||
|
||||||
load_data_state == LOAD_DATA_ACTIVE ||
|
m_load_data_state == LOAD_DATA_ACTIVE ||
|
||||||
large_query))
|
m_large_query))
|
||||||
{
|
{
|
||||||
/** Gather the information required to make routing decisions */
|
/** Gather the information required to make routing decisions */
|
||||||
RouteInfo info(this, querybuf);
|
RouteInfo info(this, querybuf);
|
||||||
@ -137,15 +137,15 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
|
|||||||
* We are already processing a request from the client. Store the
|
* We are already processing a request from the client. Store the
|
||||||
* new query and wait for the previous one to complete.
|
* new query and wait for the previous one to complete.
|
||||||
*/
|
*/
|
||||||
ss_dassert(expected_responses || query_queue);
|
ss_dassert(m_expected_responses || m_query_queue);
|
||||||
MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command",
|
MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command",
|
||||||
gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], expected_responses);
|
gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], m_expected_responses);
|
||||||
query_queue = gwbuf_append(query_queue, querybuf);
|
m_query_queue = gwbuf_append(m_query_queue, querybuf);
|
||||||
querybuf = NULL;
|
querybuf = NULL;
|
||||||
rval = 1;
|
rval = 1;
|
||||||
ss_dassert(expected_responses > 0);
|
ss_dassert(m_expected_responses > 0);
|
||||||
|
|
||||||
if (expected_responses == 0 && !route_stored_query())
|
if (m_expected_responses == 0 && !route_stored_query())
|
||||||
{
|
{
|
||||||
rval = 0;
|
rval = 0;
|
||||||
}
|
}
|
||||||
@ -176,7 +176,7 @@ bool RWSplitSession::route_stored_query()
|
|||||||
/** Loop over the stored statements as long as the routeQuery call doesn't
|
/** Loop over the stored statements as long as the routeQuery call doesn't
|
||||||
* append more data to the queue. If it appends data to the queue, we need
|
* append more data to the queue. If it appends data to the queue, we need
|
||||||
* to wait for a response before attempting another reroute */
|
* to wait for a response before attempting another reroute */
|
||||||
while (query_queue)
|
while (m_query_queue)
|
||||||
{
|
{
|
||||||
GWBUF* query_queue = modutil_get_next_MySQL_packet(&query_queue);
|
GWBUF* query_queue = modutil_get_next_MySQL_packet(&query_queue);
|
||||||
query_queue = gwbuf_make_contiguous(query_queue);
|
query_queue = gwbuf_make_contiguous(query_queue);
|
||||||
@ -189,7 +189,7 @@ bool RWSplitSession::route_stored_query()
|
|||||||
// TODO: Move the handling of queued queries to the client protocol
|
// TODO: Move the handling of queued queries to the client protocol
|
||||||
// TODO: module where the command tracking is done automatically.
|
// TODO: module where the command tracking is done automatically.
|
||||||
uint8_t cmd = mxs_mysql_get_command(query_queue);
|
uint8_t cmd = mxs_mysql_get_command(query_queue);
|
||||||
mysql_protocol_set_current_command(client_dcb, (mxs_mysql_cmd_t)cmd);
|
mysql_protocol_set_current_command(m_client, (mxs_mysql_cmd_t)cmd);
|
||||||
|
|
||||||
if (!routeQuery(query_queue))
|
if (!routeQuery(query_queue))
|
||||||
{
|
{
|
||||||
@ -217,13 +217,13 @@ bool RWSplitSession::reroute_stored_statement(const SRWBackend& old, GWBUF *stor
|
|||||||
{
|
{
|
||||||
bool success = false;
|
bool success = false;
|
||||||
|
|
||||||
if (!session_trx_is_active(client_dcb->session))
|
if (!session_trx_is_active(m_client->session))
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* Only try to retry the read if autocommit is enabled and we are
|
* Only try to retry the read if autocommit is enabled and we are
|
||||||
* outside of a transaction
|
* outside of a transaction
|
||||||
*/
|
*/
|
||||||
for (auto it = backends.begin(); it != backends.end(); it++)
|
for (auto it = m_backends.begin(); it != m_backends.end(); it++)
|
||||||
{
|
{
|
||||||
SRWBackend& backend = *it;
|
SRWBackend& backend = *it;
|
||||||
|
|
||||||
@ -236,25 +236,25 @@ bool RWSplitSession::reroute_stored_statement(const SRWBackend& old, GWBUF *stor
|
|||||||
MXS_INFO("Retrying failed read at '%s'.", backend->name());
|
MXS_INFO("Retrying failed read at '%s'.", backend->name());
|
||||||
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
|
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
|
||||||
backend->set_reply_state(REPLY_STATE_START);
|
backend->set_reply_state(REPLY_STATE_START);
|
||||||
expected_responses++;
|
m_expected_responses++;
|
||||||
success = true;
|
success = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!success && current_master && current_master->in_use())
|
if (!success && m_current_master && m_current_master->in_use())
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* Either we failed to write to the slave or no valid slave was found.
|
* Either we failed to write to the slave or no valid slave was found.
|
||||||
* Try to retry the read on the master.
|
* Try to retry the read on the master.
|
||||||
*/
|
*/
|
||||||
if (current_master->write(stored))
|
if (m_current_master->write(stored))
|
||||||
{
|
{
|
||||||
MXS_INFO("Retrying failed read at '%s'.", current_master->name());
|
MXS_INFO("Retrying failed read at '%s'.", m_current_master->name());
|
||||||
ss_dassert(current_master->get_reply_state() == REPLY_STATE_DONE);
|
ss_dassert(m_current_master->get_reply_state() == REPLY_STATE_DONE);
|
||||||
current_master->set_reply_state(REPLY_STATE_START);
|
m_current_master->set_reply_state(REPLY_STATE_START);
|
||||||
expected_responses++;
|
m_expected_responses++;
|
||||||
success = true;
|
success = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -281,15 +281,15 @@ GWBUF* RWSplitSession::discard_master_wait_gtid_result(GWBUF *buffer)
|
|||||||
/* ignore error packet */
|
/* ignore error packet */
|
||||||
if (MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_ERR)
|
if (MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_ERR)
|
||||||
{
|
{
|
||||||
wait_gtid_state = EXPECTING_NOTHING;
|
m_wait_gtid_state = EXPECTING_NOTHING;
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* this packet must be an ok packet now */
|
/* this packet must be an ok packet now */
|
||||||
ss_dassert(MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_OK);
|
ss_dassert(MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_OK);
|
||||||
packet_len = MYSQL_GET_PAYLOAD_LEN(header_and_command) + MYSQL_HEADER_LEN;
|
packet_len = MYSQL_GET_PAYLOAD_LEN(header_and_command) + MYSQL_HEADER_LEN;
|
||||||
wait_gtid_state = EXPECTING_REAL_RESULT;
|
m_wait_gtid_state = EXPECTING_REAL_RESULT;
|
||||||
next_seq = 1;
|
m_next_seq = 1;
|
||||||
|
|
||||||
return gwbuf_consume(buffer, packet_len);
|
return gwbuf_consume(buffer, packet_len);
|
||||||
}
|
}
|
||||||
@ -306,7 +306,7 @@ SRWBackend& RWSplitSession::get_backend_from_dcb(DCB *dcb)
|
|||||||
ss_dassert(dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
|
ss_dassert(dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
|
||||||
CHK_DCB(dcb);
|
CHK_DCB(dcb);
|
||||||
|
|
||||||
for (auto it = backends.begin(); it != backends.end(); it++)
|
for (auto it = m_backends.begin(); it != m_backends.end(); it++)
|
||||||
{
|
{
|
||||||
SRWBackend& backend = *it;
|
SRWBackend& backend = *it;
|
||||||
|
|
||||||
@ -338,14 +338,14 @@ void RWSplitSession::correct_packet_sequence(GWBUF *buffer)
|
|||||||
uint8_t header[3];
|
uint8_t header[3];
|
||||||
uint32_t offset = 0;
|
uint32_t offset = 0;
|
||||||
uint32_t packet_len = 0;
|
uint32_t packet_len = 0;
|
||||||
if (wait_gtid_state == EXPECTING_REAL_RESULT)
|
if (m_wait_gtid_state == EXPECTING_REAL_RESULT)
|
||||||
{
|
{
|
||||||
while (gwbuf_copy_data(buffer, offset, 3, header) == 3)
|
while (gwbuf_copy_data(buffer, offset, 3, header) == 3)
|
||||||
{
|
{
|
||||||
packet_len = MYSQL_GET_PAYLOAD_LEN(header) + MYSQL_HEADER_LEN;
|
packet_len = MYSQL_GET_PAYLOAD_LEN(header) + MYSQL_HEADER_LEN;
|
||||||
uint8_t *seq = gwbuf_byte_pointer(buffer, offset + MYSQL_SEQ_OFFSET);
|
uint8_t *seq = gwbuf_byte_pointer(buffer, offset + MYSQL_SEQ_OFFSET);
|
||||||
*seq = next_seq;
|
*seq = m_next_seq;
|
||||||
next_seq++;
|
m_next_seq++;
|
||||||
offset += packet_len;
|
offset += packet_len;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -391,28 +391,28 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
|||||||
|
|
||||||
SRWBackend& backend = get_backend_from_dcb(backend_dcb);
|
SRWBackend& backend = get_backend_from_dcb(backend_dcb);
|
||||||
|
|
||||||
if (rses_config.enable_causal_read &&
|
if (m_config.enable_causal_read &&
|
||||||
GWBUF_IS_REPLY_OK(writebuf) &&
|
GWBUF_IS_REPLY_OK(writebuf) &&
|
||||||
backend == current_master)
|
backend == m_current_master)
|
||||||
{
|
{
|
||||||
/** Save gtid position */
|
/** Save gtid position */
|
||||||
char *tmp = gwbuf_get_property(writebuf, (char *)"gtid");
|
char *tmp = gwbuf_get_property(writebuf, (char *)"gtid");
|
||||||
if (tmp)
|
if (tmp)
|
||||||
{
|
{
|
||||||
gtid_pos = std::string(tmp);
|
m_gtid_pos = std::string(tmp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (wait_gtid_state == EXPECTING_WAIT_GTID_RESULT)
|
if (m_wait_gtid_state == EXPECTING_WAIT_GTID_RESULT)
|
||||||
{
|
{
|
||||||
ss_dassert(rses_config.enable_causal_read);
|
ss_dassert(m_config.enable_causal_read);
|
||||||
if ((writebuf = discard_master_wait_gtid_result(writebuf)) == NULL)
|
if ((writebuf = discard_master_wait_gtid_result(writebuf)) == NULL)
|
||||||
{
|
{
|
||||||
// Nothing to route, return
|
// Nothing to route, return
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (wait_gtid_state == EXPECTING_REAL_RESULT)
|
if (m_wait_gtid_state == EXPECTING_REAL_RESULT)
|
||||||
{
|
{
|
||||||
correct_packet_sequence(writebuf);
|
correct_packet_sequence(writebuf);
|
||||||
}
|
}
|
||||||
@ -437,15 +437,15 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
|||||||
{
|
{
|
||||||
/** Got a complete reply, acknowledge the write and decrement expected response count */
|
/** Got a complete reply, acknowledge the write and decrement expected response count */
|
||||||
backend->ack_write();
|
backend->ack_write();
|
||||||
expected_responses--;
|
m_expected_responses--;
|
||||||
ss_dassert(expected_responses >= 0);
|
ss_dassert(m_expected_responses >= 0);
|
||||||
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
|
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
|
||||||
MXS_INFO("Reply complete, last reply from %s", backend->name());
|
MXS_INFO("Reply complete, last reply from %s", backend->name());
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
MXS_INFO("Reply not yet complete. Waiting for %d replies, got one from %s",
|
MXS_INFO("Reply not yet complete. Waiting for %d replies, got one from %s",
|
||||||
expected_responses, backend->name());
|
m_expected_responses, backend->name());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (backend->have_session_commands())
|
if (backend->have_session_commands())
|
||||||
@ -458,10 +458,10 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
|||||||
{
|
{
|
||||||
if (backend->execute_session_command())
|
if (backend->execute_session_command())
|
||||||
{
|
{
|
||||||
expected_responses++;
|
m_expected_responses++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (expected_responses == 0 && query_queue)
|
else if (m_expected_responses == 0 && m_query_queue)
|
||||||
{
|
{
|
||||||
route_stored_query();
|
route_stored_query();
|
||||||
}
|
}
|
||||||
@ -527,10 +527,10 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
|
|||||||
{
|
{
|
||||||
case ERRACT_NEW_CONNECTION:
|
case ERRACT_NEW_CONNECTION:
|
||||||
{
|
{
|
||||||
if (current_master && current_master->in_use() && current_master == backend)
|
if (m_current_master && m_current_master->in_use() && m_current_master == backend)
|
||||||
{
|
{
|
||||||
/** The connection to the master has failed */
|
/** The connection to the master has failed */
|
||||||
SERVER *srv = current_master->server();
|
SERVER *srv = m_current_master->server();
|
||||||
bool can_continue = false;
|
bool can_continue = false;
|
||||||
|
|
||||||
if (!backend->is_waiting_result())
|
if (!backend->is_waiting_result())
|
||||||
@ -545,7 +545,7 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
|
|||||||
* can't be sure whether it was executed or not. In this
|
* can't be sure whether it was executed or not. In this
|
||||||
* case the safest thing to do is to close the client
|
* case the safest thing to do is to close the client
|
||||||
* connection. */
|
* connection. */
|
||||||
if (rses_config.master_failure_mode != RW_FAIL_INSTANTLY)
|
if (m_config.master_failure_mode != RW_FAIL_INSTANTLY)
|
||||||
{
|
{
|
||||||
can_continue = true;
|
can_continue = true;
|
||||||
}
|
}
|
||||||
@ -553,15 +553,15 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
// We were expecting a response but we aren't going to get one
|
// We were expecting a response but we aren't going to get one
|
||||||
expected_responses--;
|
m_expected_responses--;
|
||||||
|
|
||||||
if (rses_config.master_failure_mode == RW_ERROR_ON_WRITE)
|
if (m_config.master_failure_mode == RW_ERROR_ON_WRITE)
|
||||||
{
|
{
|
||||||
/** In error_on_write mode, the session can continue even
|
/** In error_on_write mode, the session can continue even
|
||||||
* if the master is lost. Send a read-only error to
|
* if the master is lost. Send a read-only error to
|
||||||
* the client to let it know that the query failed. */
|
* the client to let it know that the query failed. */
|
||||||
can_continue = true;
|
can_continue = true;
|
||||||
send_readonly_error(client_dcb);
|
send_readonly_error(m_client);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!SERVER_IS_MASTER(srv) && !srv->master_err_is_logged)
|
if (!SERVER_IS_MASTER(srv) && !srv->master_err_is_logged)
|
||||||
@ -585,7 +585,7 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (target_node && target_node == backend &&
|
if (m_target_node && m_target_node == backend &&
|
||||||
session_trx_is_read_only(problem_dcb->session))
|
session_trx_is_read_only(problem_dcb->session))
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
@ -645,8 +645,8 @@ bool RWSplitSession::handle_error_new_connection(DCB *backend_dcb, GWBUF *errmsg
|
|||||||
|
|
||||||
if (backend->is_waiting_result())
|
if (backend->is_waiting_result())
|
||||||
{
|
{
|
||||||
ss_dassert(expected_responses > 0);
|
ss_dassert(m_expected_responses > 0);
|
||||||
expected_responses--;
|
m_expected_responses--;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A query was sent through the backend and it is waiting for a reply.
|
* A query was sent through the backend and it is waiting for a reply.
|
||||||
@ -677,7 +677,7 @@ bool RWSplitSession::handle_error_new_connection(DCB *backend_dcb, GWBUF *errmsg
|
|||||||
client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
|
client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (expected_responses == 0)
|
if (m_expected_responses == 0)
|
||||||
{
|
{
|
||||||
/** The response from this server was the last one, try to
|
/** The response from this server was the last one, try to
|
||||||
* route all queued queries */
|
* route all queued queries */
|
||||||
@ -697,22 +697,22 @@ bool RWSplitSession::handle_error_new_connection(DCB *backend_dcb, GWBUF *errmsg
|
|||||||
route_stored_query();
|
route_stored_query();
|
||||||
}
|
}
|
||||||
|
|
||||||
int max_nslaves = router->max_slave_count();
|
int max_nslaves = m_router->max_slave_count();
|
||||||
bool succp;
|
bool succp;
|
||||||
/**
|
/**
|
||||||
* Try to get replacement slave or at least the minimum
|
* Try to get replacement slave or at least the minimum
|
||||||
* number of slave connections for router session.
|
* number of slave connections for router session.
|
||||||
*/
|
*/
|
||||||
if (recv_sescmd > 0 && rses_config.disable_sescmd_history)
|
if (m_recv_sescmd > 0 && m_config.disable_sescmd_history)
|
||||||
{
|
{
|
||||||
succp = router->have_enough_servers();
|
succp = m_router->have_enough_servers();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
succp = router->select_connect_backend_servers(ses, backends,
|
succp = m_router->select_connect_backend_servers(ses, m_backends,
|
||||||
current_master,
|
m_current_master,
|
||||||
&sescmd_list,
|
&m_sescmd_list,
|
||||||
&expected_responses,
|
&m_expected_responses,
|
||||||
connection_type::SLAVE);
|
connection_type::SLAVE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -729,15 +729,15 @@ bool RWSplitSession::handle_error_new_connection(DCB *backend_dcb, GWBUF *errmsg
|
|||||||
*/
|
*/
|
||||||
void RWSplitSession::handle_error_reply_client(DCB *backend_dcb, GWBUF *errmsg)
|
void RWSplitSession::handle_error_reply_client(DCB *backend_dcb, GWBUF *errmsg)
|
||||||
{
|
{
|
||||||
mxs_session_state_t sesstate = client_dcb->session->state;
|
mxs_session_state_t sesstate = m_client->session->state;
|
||||||
SRWBackend& backend = get_backend_from_dcb(backend_dcb);
|
SRWBackend& backend = get_backend_from_dcb(backend_dcb);
|
||||||
|
|
||||||
backend->close();
|
backend->close();
|
||||||
|
|
||||||
if (sesstate == SESSION_STATE_ROUTER_READY)
|
if (sesstate == SESSION_STATE_ROUTER_READY)
|
||||||
{
|
{
|
||||||
CHK_DCB(client_dcb);
|
CHK_DCB(m_client);
|
||||||
client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
|
m_client->func.write(m_client, gwbuf_clone(errmsg));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -747,9 +747,9 @@ uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer)
|
|||||||
|
|
||||||
// All COM_STMT type statements store the ID in the same place
|
// All COM_STMT type statements store the ID in the same place
|
||||||
uint32_t id = mxs_mysql_extract_ps_id(buffer);
|
uint32_t id = mxs_mysql_extract_ps_id(buffer);
|
||||||
ClientHandleMap::iterator it = rses->ps_handles.find(id);
|
ClientHandleMap::iterator it = rses->m_ps_handles.find(id);
|
||||||
|
|
||||||
if (it != rses->ps_handles.end())
|
if (it != rses->m_ps_handles.end())
|
||||||
{
|
{
|
||||||
rval = it->second;
|
rval = it->second;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -94,33 +94,33 @@ public:
|
|||||||
bool* pSuccess);
|
bool* pSuccess);
|
||||||
|
|
||||||
// TODO: Make member variables private
|
// TODO: Make member variables private
|
||||||
mxs::SRWBackendList backends; /**< List of backend servers */
|
mxs::SRWBackendList m_backends; /**< List of backend servers */
|
||||||
mxs::SRWBackend current_master; /**< Current master server */
|
mxs::SRWBackend m_current_master; /**< Current master server */
|
||||||
mxs::SRWBackend target_node; /**< The currently locked target node */
|
mxs::SRWBackend m_target_node; /**< The currently locked target node */
|
||||||
mxs::SRWBackend prev_target; /**< The previous target where a query was sent */
|
mxs::SRWBackend m_prev_target; /**< The previous target where a query was sent */
|
||||||
bool large_query; /**< Set to true when processing payloads >= 2^24 bytes */
|
bool m_large_query; /**< Set to true when processing payloads >= 2^24 bytes */
|
||||||
Config rses_config; /**< copied config info from router instance */
|
Config m_config; /**< copied config info from router instance */
|
||||||
int rses_nbackends;
|
int m_nbackends; /**< Number of backend servers (obsolete) */
|
||||||
enum ld_state load_data_state; /**< Current load data state */
|
enum ld_state m_load_data_state; /**< Current load data state */
|
||||||
bool have_tmp_tables;
|
bool m_have_tmp_tables; /**< True if temp tables have been created */
|
||||||
uint64_t rses_load_data_sent; /**< How much data has been sent */
|
uint64_t m_load_data_sent; /**< How much data has been sent */
|
||||||
DCB* client_dcb;
|
DCB* m_client; /**< The client DCB */
|
||||||
uint64_t sescmd_count;
|
uint64_t m_sescmd_count; /**< Number of executed session commands */
|
||||||
int expected_responses; /**< Number of expected responses to the current query */
|
int m_expected_responses; /**< Number of expected responses to the current query */
|
||||||
GWBUF* query_queue; /**< Queued commands waiting to be executed */
|
GWBUF* m_query_queue; /**< Queued commands waiting to be executed */
|
||||||
RWSplit* router; /**< The router instance */
|
RWSplit* m_router; /**< The router instance */
|
||||||
TableSet temp_tables; /**< Set of temporary tables */
|
TableSet m_temp_tables; /**< Set of temporary tables */
|
||||||
mxs::SessionCommandList sescmd_list; /**< List of executed session commands */
|
mxs::SessionCommandList m_sescmd_list; /**< List of executed session commands */
|
||||||
ResponseMap sescmd_responses; /**< Response to each session command */
|
ResponseMap m_sescmd_responses; /**< Response to each session command */
|
||||||
SlaveResponseList slave_responses; /**< Slaves that replied before the master */
|
SlaveResponseList m_slave_responses; /**< Slaves that replied before the master */
|
||||||
uint64_t sent_sescmd; /**< ID of the last sent session command*/
|
uint64_t m_sent_sescmd; /**< ID of the last sent session command*/
|
||||||
uint64_t recv_sescmd; /**< ID of the most recently completed session command */
|
uint64_t m_recv_sescmd; /**< ID of the most recently completed session command */
|
||||||
PSManager ps_manager; /**< Prepared statement manager*/
|
PSManager m_ps_manager; /**< Prepared statement manager*/
|
||||||
ClientHandleMap ps_handles; /**< Client PS handle to internal ID mapping */
|
ClientHandleMap m_ps_handles; /**< Client PS handle to internal ID mapping */
|
||||||
ExecMap exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */
|
ExecMap m_exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */
|
||||||
std::string gtid_pos; /**< Gtid position for causal read */
|
std::string m_gtid_pos; /**< Gtid position for causal read */
|
||||||
wait_gtid_state_t wait_gtid_state; /**< Determine boundray of wait gtid result and client query result */
|
wait_gtid_state_t m_wait_gtid_state; /**< Determine boundray of wait gtid result and client query result */
|
||||||
uint32_t next_seq; /**< Next packet'ssequence number */
|
uint32_t m_next_seq; /**< Next packet'ssequence number */
|
||||||
|
|
||||||
private:
|
private:
|
||||||
RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
||||||
@ -170,7 +170,7 @@ private:
|
|||||||
*/
|
*/
|
||||||
inline bool locked_to_master() const
|
inline bool locked_to_master() const
|
||||||
{
|
{
|
||||||
return large_query || (current_master && target_node == current_master);
|
return m_large_query || (m_current_master && m_target_node == m_current_master);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user