MXS-2555 Use mxs_mysql_execute_kill_all_others in smartrouter
This commit is contained in:
@ -291,7 +291,7 @@ void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
|
|||||||
m_router.perf_update(m_measurement.canonical, {cluster.host, query_dur});
|
m_router.perf_update(m_measurement.canonical, {cluster.host, query_dur});
|
||||||
// If the query is still going on, an error packet is received, else the
|
// If the query is still going on, an error packet is received, else the
|
||||||
// whole query might play out (and be discarded).
|
// whole query might play out (and be discarded).
|
||||||
kill_all_others_v2(cluster.host);
|
kill_all_others(cluster);
|
||||||
}
|
}
|
||||||
|
|
||||||
m_mode = Mode::CollectResults;
|
m_mode = Mode::CollectResults;
|
||||||
@ -438,92 +438,15 @@ bool SmartRouterSession::write_split_packets(GWBUF* pBuf)
|
|||||||
return true; // TODO. What could possibly go wrong?
|
return true; // TODO. What could possibly go wrong?
|
||||||
}
|
}
|
||||||
|
|
||||||
/* TODO This should work much the way that kill_all_others_v2 works, but it does
|
void SmartRouterSession::kill_all_others(const Cluster& cluster)
|
||||||
* not. Something funky happens to the dcb/pipeline when this is used.
|
|
||||||
* Leaving it here, since it should be fixed.
|
|
||||||
*/
|
|
||||||
void SmartRouterSession::kill_all_others_v1(const maxbase::Host& host)
|
|
||||||
{
|
{
|
||||||
MySQLProtocol* pProt = static_cast<MySQLProtocol*>(m_pClient_dcb->protocol);
|
MySQLProtocol* proto = static_cast<MySQLProtocol*>(cluster.pDcb->protocol);
|
||||||
uint64_t mysql_thread_id = pProt->thread_id;
|
int keep_protocol_thread_id = proto->thread_id;
|
||||||
|
|
||||||
for (Cluster& cluster : m_clusters)
|
mxs_mysql_execute_kill_all_others(cluster.pDcb->session, cluster.pDcb->session->ses_id,
|
||||||
{
|
keep_protocol_thread_id, KT_QUERY);
|
||||||
if (cluster.host == host || !cluster.tracker.expecting_response_packets())
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
MXS_SDEBUG("Queue " << cluster.host << " mysql_thread_id=" << mysql_thread_id << " for kill");
|
|
||||||
mxs_mysql_execute_kill(cluster.pDcb->session, mysql_thread_id, KT_QUERY);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct KillStruct
|
|
||||||
{
|
|
||||||
std::string user;
|
|
||||||
std::string password;
|
|
||||||
maxbase::Host host;
|
|
||||||
int mysql_thread_id;
|
|
||||||
};
|
|
||||||
using KillStructs = std::vector<KillStruct>;
|
|
||||||
|
|
||||||
void kill_thread(const KillStructs& kill_structs)
|
|
||||||
{
|
|
||||||
for (auto& ks : kill_structs)
|
|
||||||
{
|
|
||||||
auto conn = mysql_init(nullptr);
|
|
||||||
if (mysql_real_connect(conn, ks.host.address().c_str(),
|
|
||||||
ks.user.c_str(), ks.password.c_str(),
|
|
||||||
"", ks.host.port(), nullptr, 0) == nullptr)
|
|
||||||
{
|
|
||||||
MXS_SERROR("Trying to kill query on " << ks.host << " but failed to connect");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::ostringstream os;
|
|
||||||
os << "kill query " << ks.mysql_thread_id;
|
|
||||||
auto sql = os.str();
|
|
||||||
MXS_SINFO("Sending '" << sql << "' to " << ks.host);
|
|
||||||
mysql_real_query(conn, sql.c_str(), sql.size());
|
|
||||||
auto err_code = mysql_errno(conn);
|
|
||||||
if (err_code)
|
|
||||||
{
|
|
||||||
MXS_SERROR("Failed to send kill err code=" << err_code);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void SmartRouterSession::kill_all_others_v2(const maxbase::Host& host)
|
|
||||||
{
|
|
||||||
MySQLProtocol* pProt = static_cast<MySQLProtocol*>(m_pClient_dcb->protocol);
|
|
||||||
int mysql_thread_id = pProt->thread_id;
|
|
||||||
|
|
||||||
KillStructs kill_structs;
|
|
||||||
for (Cluster& cluster : m_clusters)
|
|
||||||
{
|
|
||||||
if (cluster.host == host || !cluster.tracker.expecting_response_packets())
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// TODO TODO: Where do the user and password come from? And also, open
|
|
||||||
// a permanent connection to each Cluster for killing.
|
|
||||||
std::string TODO_user = "maxscale";
|
|
||||||
std::string TODO_password = "pass";
|
|
||||||
|
|
||||||
kill_structs.push_back(KillStruct {TODO_user, TODO_password,
|
|
||||||
cluster.host, mysql_thread_id});
|
|
||||||
MXS_SDEBUG("Queue " << cluster.host << " mysql_thread_id=" << mysql_thread_id << " for kill");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!kill_structs.empty())
|
|
||||||
{
|
|
||||||
std::thread murderer {kill_thread, kill_structs};
|
|
||||||
murderer.detach();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void SmartRouterSession::handleError(GWBUF* pPacket,
|
void SmartRouterSession::handleError(GWBUF* pPacket,
|
||||||
DCB* pProblem,
|
DCB* pProblem,
|
||||||
mxs_error_action_t action,
|
mxs_error_action_t action,
|
||||||
|
|||||||
@ -93,8 +93,7 @@ private:
|
|||||||
bool write_to_all(GWBUF* pBuf, Mode mode);
|
bool write_to_all(GWBUF* pBuf, Mode mode);
|
||||||
bool write_split_packets(GWBUF* pBuf);
|
bool write_split_packets(GWBUF* pBuf);
|
||||||
|
|
||||||
void kill_all_others_v1(const maxbase::Host& host);
|
void kill_all_others(const Cluster& cluster);
|
||||||
void kill_all_others_v2(const maxbase::Host& host);
|
|
||||||
|
|
||||||
bool expecting_request_packets() const;
|
bool expecting_request_packets() const;
|
||||||
bool expecting_response_packets() const;
|
bool expecting_response_packets() const;
|
||||||
|
|||||||
Reference in New Issue
Block a user