Merge branch '2.3' into develop

This commit is contained in:
Markus Mäkelä
2019-03-07 16:21:03 +02:00
13 changed files with 69 additions and 13 deletions

View File

@ -53,7 +53,7 @@ now secure and can be used across networks.
## Requesting Data ## Requesting Data
**Note:** For the sake of brevity, the rest of this tutorial will omit the **Note:** For the sake of brevity, the rest of this tutorial will omit the
TLS/SSL options for the `curl` command line. For more information, refer to the TLS/SSL options from the `curl` command line. For more information, refer to the
`curl` manpage. `curl` manpage.
The most basic task to do with the REST API is to see whether MaxScale is up and The most basic task to do with the REST API is to see whether MaxScale is up and
@ -272,8 +272,7 @@ execute the following command.
curl -X PATCH -d @server1.txt 127.0.0.1:8989/v1/servers/server1 curl -X PATCH -d @server1.txt 127.0.0.1:8989/v1/servers/server1
``` ```
To verify that the data was updated correctly, request the updated created To verify that the data was updated correctly, request the updated object.
object.
``` ```
curl 127.0.0.1:8989/v1/servers/server1 curl 127.0.0.1:8989/v1/servers/server1

View File

@ -45,6 +45,7 @@ enum gwbuf_type_t
GWBUF_TYPE_RESULT = (1 << 3), GWBUF_TYPE_RESULT = (1 << 3),
GWBUF_TYPE_REPLY_OK = (1 << 4), GWBUF_TYPE_REPLY_OK = (1 << 4),
GWBUF_TYPE_REPLAYED = (1 << 5), GWBUF_TYPE_REPLAYED = (1 << 5),
GWBUF_TYPE_TRACK_STATE = (1 << 6),
}; };
#define GWBUF_IS_TYPE_UNDEFINED(b) ((b)->gwbuf_type == 0) #define GWBUF_IS_TYPE_UNDEFINED(b) ((b)->gwbuf_type == 0)
@ -56,6 +57,9 @@ enum gwbuf_type_t
// True if the query is not initiated by the client but an internal replaying mechanism // True if the query is not initiated by the client but an internal replaying mechanism
#define GWBUF_IS_REPLAYED(b) ((b)->gwbuf_type & GWBUF_TYPE_REPLAYED) #define GWBUF_IS_REPLAYED(b) ((b)->gwbuf_type & GWBUF_TYPE_REPLAYED)
// Track session state change response
#define GWBUF_SHOULD_TRACK_STATE(b) ((b)->gwbuf_type & GWBUF_TYPE_TRACK_STATE)
enum gwbuf_info_t enum gwbuf_info_t
{ {
GWBUF_INFO_NONE = 0x0, GWBUF_INFO_NONE = 0x0,

View File

@ -336,6 +336,7 @@ typedef struct
GWBUF* stored_query; /*< Temporarily stored queries */ GWBUF* stored_query; /*< Temporarily stored queries */
bool collect_result; /*< Collect the next result set as one buffer */ bool collect_result; /*< Collect the next result set as one buffer */
bool changing_user; bool changing_user;
bool track_state; /*< Track session state */
uint32_t num_eof_packets; /*< Encountered eof packet number, used for check uint32_t num_eof_packets; /*< Encountered eof packet number, used for check
* packet type */ * packet type */
bool large_query; /*< Whether to ignore the command byte of the next bool large_query; /*< Whether to ignore the command byte of the next

View File

@ -17,6 +17,7 @@ user=maxskysql
password=skysql password=skysql
auth_all_servers=1 auth_all_servers=1
ignore_databases_regex=.* ignore_databases_regex=.*
refresh_interval=5
[Sharding-Listener] [Sharding-Listener]
type=listener type=listener

View File

@ -75,11 +75,11 @@ int main(int argc, char* argv[])
"GRANT SELECT,USAGE,CREATE ON shard_db.* TO 'user%d'@'%%'", "GRANT SELECT,USAGE,CREATE ON shard_db.* TO 'user%d'@'%%'",
i), i),
"Query should succeed."); "Query should succeed.");
execute_query(Test->repl->nodes[i], "FLUSH PRIVILEGES");
} }
Test->repl->close_connections(); Test->repl->close_connections();
Test->stop_timeout(); Test->stop_timeout();
sleep(10);
MYSQL* conn; MYSQL* conn;
for (i = 0; i < Test->repl->N; i++) for (i = 0; i < Test->repl->N; i++)
@ -97,6 +97,7 @@ int main(int argc, char* argv[])
Test->add_result(execute_query(conn, "CREATE TABLE table%d (x1 int, fl int);", i), Test->add_result(execute_query(conn, "CREATE TABLE table%d (x1 int, fl int);", i),
"Query should succeed."); "Query should succeed.");
} }
sleep(6); // The router is configured to refresh the shard map if older than 5 seconds.
for (i = 0; i < Test->repl->N; i++) for (i = 0; i < Test->repl->N; i++)
{ {

View File

@ -1439,7 +1439,7 @@ bool Service::dump_config(const char* filename) const
for (const auto& f : m_filters) for (const auto& f : m_filters)
{ {
dprintf(file, "%s%s", sep, f->name.c_str()); dprintf(file, "%s%s", sep, f->name.c_str());
sep = ","; sep = "|";
} }
dprintf(file, "\n"); dprintf(file, "\n");

View File

@ -439,6 +439,8 @@ static inline void prepare_for_write(DCB* dcb, GWBUF* buffer)
{ {
proto->collect_result = true; proto->collect_result = true;
} }
proto->track_state = GWBUF_SHOULD_TRACK_STATE(buffer);
} }
/******************************************************************************* /*******************************************************************************
@ -817,7 +819,8 @@ static int gw_read_and_write(DCB* dcb)
* The OK packets sent in response to COM_STMT_PREPARE are of a different * The OK packets sent in response to COM_STMT_PREPARE are of a different
* format so we need to detect and skip them. */ * format so we need to detect and skip them. */
if (rcap_type_required(capabilities, RCAP_TYPE_SESSION_STATE_TRACKING) if (rcap_type_required(capabilities, RCAP_TYPE_SESSION_STATE_TRACKING)
&& !expecting_ps_response(proto)) && !expecting_ps_response(proto)
&& proto->track_state)
{ {
mxs_mysql_get_session_track_info(tmp, proto); mxs_mysql_get_session_track_info(tmp, proto);
} }

View File

@ -62,6 +62,7 @@ MySQLProtocol* mysql_protocol_init(DCB* dcb, int fd)
p->changing_user = false; p->changing_user = false;
p->num_eof_packets = 0; p->num_eof_packets = 0;
p->large_query = false; p->large_query = false;
p->track_state = false;
/*< Assign fd with protocol */ /*< Assign fd with protocol */
p->fd = fd; p->fd = fd;
p->owner_dcb = dcb; p->owner_dcb = dcb;

View File

@ -277,6 +277,11 @@ bool RWSplitSession::route_single_stmt(GWBUF* querybuf)
} }
else if (TARGET_IS_MASTER(route_target)) else if (TARGET_IS_MASTER(route_target))
{ {
if (m_config.causal_reads)
{
gwbuf_set_type(querybuf, GWBUF_TYPE_TRACK_STATE);
}
succp = handle_master_is_target(&target); succp = handle_master_is_target(&target);
if (!succp && should_migrate_trx(target)) if (!succp && should_migrate_trx(target))

View File

@ -22,6 +22,7 @@
#include <functional> #include <functional>
#include <random> #include <random>
#include <iostream> #include <iostream>
#include <array>
#include <maxbase/stopwatch.hh> #include <maxbase/stopwatch.hh>
#include <maxscale/router.hh> #include <maxscale/router.hh>
@ -193,6 +194,11 @@ BackendSelectFunction get_backend_select_function(select_criteria_t sc)
return backend_cmp_current_load; return backend_cmp_current_load;
} }
namespace
{
// Buffers for sorting the servers, thread_local to avoid repeated memory allocations
thread_local std::array<PRWBackends, 3> priority_map;
}
/** /**
* @brief Find the best slave candidate for routing reads. * @brief Find the best slave candidate for routing reads.
@ -208,7 +214,6 @@ PRWBackends::iterator find_best_backend(PRWBackends& backends,
bool masters_accepts_reads) bool masters_accepts_reads)
{ {
// Group backends by priority. The set of highest priority backends will then compete. // Group backends by priority. The set of highest priority backends will then compete.
std::map<int, PRWBackends> priority_map;
int best_priority {INT_MAX}; // low numbers are high priority int best_priority {INT_MAX}; // low numbers are high priority
for (auto& psBackend : backends) for (auto& psBackend : backends)
@ -222,16 +227,16 @@ PRWBackends::iterator find_best_backend(PRWBackends& backends,
{ {
if (!is_busy) if (!is_busy)
{ {
priority = 1; // highest priority, idle servers priority = 0; // highest priority, idle servers
} }
else else
{ {
priority = 13; // lowest priority, busy servers priority = 2; // lowest priority, busy servers
} }
} }
else else
{ {
priority = 2; // idle masters with masters_accept_reads==false priority = 1; // idle masters with masters_accept_reads==false
} }
priority_map[priority].push_back(psBackend); priority_map[priority].push_back(psBackend);
@ -239,8 +244,14 @@ PRWBackends::iterator find_best_backend(PRWBackends& backends,
} }
auto best = select(priority_map[best_priority]); auto best = select(priority_map[best_priority]);
auto rval = std::find(backends.begin(), backends.end(), *best);
return std::find(backends.begin(), backends.end(), *best); for (auto& a : priority_map)
{
a.clear();
}
return rval;
} }
/** /**

View File

@ -563,6 +563,17 @@ static bool server_is_shutting_down(GWBUF* writebuf)
return err == ER_SERVER_SHUTDOWN || err == ER_NORMAL_SHUTDOWN || err == ER_SHUTDOWN_COMPLETE; return err == ER_SERVER_SHUTDOWN || err == ER_NORMAL_SHUTDOWN || err == ER_SHUTDOWN_COMPLETE;
} }
void RWSplitSession::close_stale_connections()
{
for (auto& backend : m_backends)
{
if (backend->in_use() && !backend->can_connect())
{
backend->close();
}
}
}
void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb) void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb)
{ {
DCB* client_dcb = backend_dcb->session->client_dcb; DCB* client_dcb = backend_dcb->session->client_dcb;
@ -719,6 +730,15 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb)
m_can_replay_trx = true; m_can_replay_trx = true;
} }
if (m_expected_responses == 0)
{
/**
* Close stale connections to servers in maintenance. Done here to avoid closing the connections
* before all responses have been received.
*/
close_stale_connections();
}
if (backend->in_use() && backend->has_session_commands()) if (backend->in_use() && backend->has_session_commands())
{ {
// Backend is still in use and has more session commands to execute // Backend is still in use and has more session commands to execute

View File

@ -144,6 +144,7 @@ private:
void continue_large_session_write(GWBUF* querybuf, uint32_t type); void continue_large_session_write(GWBUF* querybuf, uint32_t type);
bool route_single_stmt(GWBUF* querybuf); bool route_single_stmt(GWBUF* querybuf);
bool route_stored_query(); bool route_stored_query();
void close_stale_connections();
mxs::RWBackend* get_hinted_backend(char* name); mxs::RWBackend* get_hinted_backend(char* name);
mxs::RWBackend* get_slave_backend(int max_rlag); mxs::RWBackend* get_slave_backend(int max_rlag);

View File

@ -1592,8 +1592,17 @@ bool SchemaRouterSession::send_tables(GWBUF* pPacket)
if (database.empty()) if (database.empty())
{ {
MXS_FREE(query); // Was not a "show tables from x". If a current database is selected, use that as target.
return false; if (!m_current_db.empty())
{
database = m_current_db;
}
else
{
// No current db, route the query to a server, likely getting "No database selected"
MXS_FREE(query);
return false;
}
} }
ServerMap tablelist; ServerMap tablelist;