MXS-2208 Introduce asynchronous GET

A multi HTTP GET can now be performed so that the caller
drives the polling of results.

This can now be used so that inside a monitor worker, the delays
are handled using delayed calls. Consequently, the monitor worker
event loop can remain responsive even though the Clustrix nodes
are being polled.
This commit is contained in:
Johan Wikman
2018-12-17 15:45:56 +02:00
parent 38a0d6a2df
commit 847f53b21b
3 changed files with 589 additions and 61 deletions

View File

@ -91,7 +91,162 @@ std::vector<Result> get(const std::vector<std::string>& urls,
std::vector<Result> get(const std::vector<std::string>& urls,
const std::string& user, const std::string& password,
const Config& config = Config());
}
/**
* @class mxb::http::Async
*
* Class for performing multiple HTTP GETs concurrently and asynchronously.
* The instance should be viewed as a handle to the operation. If it is
* copied, both instances refer to the same operation and both instances
* can be used for driving the GET. However, an instance can *only* be
* used or copied in the thread where it is created.
*/
class Async
{
public:
enum status_t
{
READY, // The result is ready.
ERROR, // The operation has failed.
PENDING // The operation is pending.
};
class Imp
{
public:
virtual ~Imp();
virtual status_t status() const = 0;
virtual status_t perform(long timeout_ms) = 0;
virtual long wait_no_more_than() const = 0;
virtual const std::vector<Result>& results() const = 0;
};
/**
* Defalt constructor creates an asynchronous operation whose status is ERROR.
*/
Async();
/**
* Copy constructor; creates an instance that refers to the same operation
* the argument refers to.
*
* @param rhs An existing @c Async instance.
*/
Async(const Async& rhs)
: m_sImp(rhs.m_sImp)
{
}
/**
* Assigns an asynchronous operation.
*
* @param rhs An other @c Async instance.
*
* @return *this.
*/
Async& operator = (const Async& rhs)
{
std::shared_ptr<Imp> sImp(rhs.m_sImp);
m_sImp.swap(sImp);
return *this;
}
/**
* Return the status of the operation.
*
* @return @c READY|ERROR|PENDING
*/
status_t status() const
{
return m_sImp->status();
}
/**
* Performs a step in the operation.
*
* @param timeout_ms The maximum timeout for waiting for activity on the
* underlying socket descriptors.
*
* @return @c READY|ERROR|PENDING
*/
status_t perform(long timeout_ms = 0)
{
return m_sImp->perform(timeout_ms);
}
/**
* How much time to wait at most, before calling perform() again.
*
* This value is dependent upon the timeouts that were specified when
* the operation was initiated. To ensure that operations are not timed
* out, do not wait as long as this function returns but significantly
* less.
*
* @return Maximum time to wait in milliseconds.
*/
long wait_no_more_than() const
{
return m_sImp->wait_no_more_than();
}
/**
* The result of each operation. This function should not be called
* before the status is READY.
*
* @return Vector of results.
*/
const std::vector<Result>& results() const
{
return m_sImp->results();
}
public:
Async(const std::shared_ptr<Imp>& sImp)
: m_sImp(sImp)
{
}
private:
std::shared_ptr<Imp> m_sImp;
};
/**
* Return human-readable string for a status value.
*
* @param status A status value.
*
* @return The corresponding string.
*/
const char* to_string(Async::status_t status);
/**
* Do a HTTP GET, asynchronously.
*
* @param urls The URLs to GET.
* @param config The config to use.
*
* @return An Async instance using which the operation can be performed.
*/
Async get_async(const std::vector<std::string>& urls,
const Config& config = Config());
/**
* Do a HTTP GET, asynchronously.
*
* @param urls The URLs to GET.
* @param user Username to use.
* @param password Password for the user.
* @param config The config to use.
*
* @return An Async instance using which the operation can be performed.
*/
Async get_async(const std::vector<std::string>& urls,
const std::string& user, const std::string& password,
const Config& config = Config());
}
}

View File

@ -14,11 +14,12 @@
#include <maxbase/http.hh>
#include <algorithm>
#include <array>
#include <map>
#include <unordered_map>
#include <utility>
#include <curl/curl.h>
#include <maxbase/assert.h>
#include <maxbase/string.hh>
#include <iostream>
using namespace std;
using std::array;
@ -128,53 +129,6 @@ CURL* get_easy_curl(const std::string& url,
return pCurl;
}
}
namespace maxbase
{
namespace http
{
Result get(const std::string& url, const Config& config)
{
return std::move(get(url, "", "", config));
}
Result get(const std::string& url, const std::string& user, const std::string& password, const Config& config)
{
Result res;
char errbuf[CURL_ERROR_SIZE + 1] = "";
CURL* pCurl = get_easy_curl(url, user, password, config, &res, errbuf);
mxb_assert(pCurl);
if (curl_easy_perform(pCurl) == CURLE_OK)
{
long code = 0; // needs to be a long
curl_easy_getinfo(pCurl, CURLINFO_RESPONSE_CODE, &code);
res.code = code;
}
else
{
res.code = -1;
res.body = errbuf;
}
curl_easy_cleanup(pCurl);
return res;
}
vector<Result> get(const std::vector<std::string>& urls, const Config& config)
{
return get(urls, "", "", config);
}
namespace
{
using Errbuf = array<char, CURL_ERROR_SIZE + 1>;
struct Context
@ -290,6 +244,11 @@ void execute(CURLM* pCurlm,
curl_easy_getinfo(pCurl, CURLINFO_RESPONSE_CODE, &code);
pResult->code = code;
if ((code == 0) && pResult->body.empty())
{
pResult->body = pErrbuf->data();
}
curls.erase(it);
curl_multi_remove_handle(pCurlm, pCurl);
curl_easy_cleanup(pCurl);
@ -356,6 +315,347 @@ vector<Result> get_curlm(CURLM* pCurlm,
return results;
}
class ErrorImp : public Async::Imp
{
public:
ErrorImp()
{
}
Async::status_t status() const
{
return Async::ERROR;
}
Async::status_t perform(long timeout_ms)
{
return Async::ERROR;
}
long wait_no_more_than() const
{
return 0;
}
const std::vector<Result>& results() const
{
return m_results;
}
private:
vector<Result> m_results;
};
class HttpImp : public Async::Imp
{
public:
HttpImp()
: m_pCurlm(curl_multi_init())
, m_status(Async::ERROR)
, m_still_running(0)
, m_wait_no_more_than(0)
{
mxb_assert(m_pCurlm);
if (!m_pCurlm)
{
throw std::bad_alloc();
}
}
~HttpImp()
{
mxb_assert(m_pCurlm);
for (auto& item : m_curls)
{
CURL* pCurl = item.first;
MXB_AT_DEBUG(CURLMcode rv =) curl_multi_remove_handle(m_pCurlm, pCurl);
mxb_assert(rv == CURLM_OK);
curl_easy_cleanup(pCurl);
}
CURLMcode code = curl_multi_cleanup(m_pCurlm);
if (code != CURLM_OK)
{
MXB_ERROR("curl_multi_cleanup() failed: %s", curl_multi_strerror(code));
}
}
bool initialize(const std::vector<std::string>& urls,
const std::string& user, const std::string& password,
const Config& config)
{
mxb_assert(m_status == Async::ERROR);
m_results.reserve(urls.size());
m_errbufs.reserve(urls.size());
size_t i;
for (i = 0; i < urls.size(); ++i)
{
m_results.resize(i + 1);
m_errbufs.resize(i + 1);
CURL* pCurl = get_easy_curl(urls[i], user, password, config, &m_results[i], m_errbufs[i].data());
if (!pCurl || (curl_multi_add_handle(m_pCurlm, pCurl) != CURLM_OK))
{
mxb_assert(!true);
if (pCurl)
{
curl_easy_cleanup(pCurl);
}
m_results.resize(m_results.size() - 1);
break;
}
else
{
m_curls.insert(std::make_pair(pCurl, Context(&m_results[i], &m_errbufs[i])));
}
}
if (m_results.size() == urls.size())
{
CURLMcode rv = curl_multi_perform(m_pCurlm, &m_still_running);
if (rv == CURLM_OK)
{
if (m_still_running == 0)
{
m_status = Async::READY;
m_wait_no_more_than = 0;
}
else
{
update_timeout();
m_status = Async::PENDING;
}
}
else
{
MXB_ERROR("curl_multi_perform() failed: %s", curl_multi_strerror(rv));
m_status = Async::ERROR;
}
}
return m_status != Async::ERROR;
}
Async::status_t status() const override
{
return m_status;
}
Async::status_t perform(long timeout_ms) override
{
switch (m_status)
{
case Async::READY:
break;
case Async::ERROR:
mxb_assert(!true);
break;
default:
{
mxb_assert(m_status == Async::PENDING);
fd_set fdread;
fd_set fdwrite;
fd_set fdexcep;
FD_ZERO(&fdread);
FD_ZERO(&fdwrite);
FD_ZERO(&fdexcep);
int maxfd;
CURLMcode rv_curl = curl_multi_fdset(m_pCurlm, &fdread, &fdwrite, &fdexcep, &maxfd);
if (rv_curl == CURLM_OK)
{
int rv = 0;
if (maxfd != -1)
{
struct timeval timeout = { timeout_ms / 1000, (timeout_ms % 1000) * 1000 };
rv = select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout);
}
switch (rv)
{
case -1:
mxb_assert(!true);
MXB_ERROR("select() failed: %s", mxb_strerror(errno));
m_status = Async::ERROR;
break;
case 0:
default:
rv_curl = curl_multi_perform(m_pCurlm, &m_still_running);
if (rv_curl == CURLM_OK)
{
if (m_still_running == 0)
{
m_status = Async::READY;
}
else
{
update_timeout();
}
}
else
{
MXB_ERROR("curl_multi_perform() failed: %s", curl_multi_strerror(rv_curl));
m_status = Async::ERROR;
}
}
}
if (m_status == Async::READY)
{
mxb_assert(m_still_running == 0);
int nRemaining = 0;
do
{
CURLMsg* pMsg = curl_multi_info_read(m_pCurlm, &nRemaining);
if (pMsg && (pMsg->msg == CURLMSG_DONE))
{
CURL* pCurl = pMsg->easy_handle;
auto it = m_curls.find(pCurl);
mxb_assert(it != m_curls.end());
auto& context = it->second;
Result* pResult = context.pResult;
Errbuf* pErrbuf = context.pErrbuf;
long code;
curl_easy_getinfo(pCurl, CURLINFO_RESPONSE_CODE, &code);
pResult->code = code;
if ((code == 0) && pResult->body.empty())
{
pResult->body = pErrbuf->data();
}
m_curls.erase(it);
curl_multi_remove_handle(m_pCurlm, pCurl);
curl_easy_cleanup(pCurl);
}
}
while (nRemaining != 0);
}
}
}
return m_status;
}
long wait_no_more_than() const
{
return m_wait_no_more_than;
}
const std::vector<Result>& results() const
{
return m_results;
}
private:
void update_timeout()
{
curl_multi_timeout(m_pCurlm, &m_wait_no_more_than);
if (m_wait_no_more_than < 0)
{
// No default value, we'll use 100ms as default.
m_wait_no_more_than = 100;
}
}
private:
CURLM* m_pCurlm;
Async::status_t m_status;
vector<Result> m_results;
vector<array<char, CURL_ERROR_SIZE + 1>> m_errbufs;
unordered_map<CURL*, Context> m_curls;
int m_still_running;
long m_wait_no_more_than;
};
}
namespace maxbase
{
namespace http
{
Async::Imp::~Imp()
{
}
Async::Async()
: m_sImp(std::make_shared<ErrorImp>())
{
}
Async get_async(const std::vector<std::string>& urls,
const Config& config)
{
return get_async(urls, "", "", config);
}
Async get_async(const std::vector<std::string>& urls,
const std::string& user, const std::string& password,
const Config& config)
{
shared_ptr<Async::Imp> sImp;
shared_ptr<HttpImp> sHttp_imp = std::make_shared<HttpImp>();
if (sHttp_imp->initialize(urls, user, password, config))
{
sImp = sHttp_imp;
}
else
{
sImp = std::make_shared<ErrorImp>();
}
return Async(sImp);
}
Result get(const std::string& url, const Config& config)
{
return std::move(get(url, "", "", config));
}
Result get(const std::string& url, const std::string& user, const std::string& password, const Config& config)
{
Result res;
char errbuf[CURL_ERROR_SIZE + 1] = "";
CURL* pCurl = get_easy_curl(url, user, password, config, &res, errbuf);
mxb_assert(pCurl);
if (curl_easy_perform(pCurl) == CURLE_OK)
{
long code = 0; // needs to be a long
curl_easy_getinfo(pCurl, CURLINFO_RESPONSE_CODE, &code);
res.code = code;
}
else
{
res.code = -1;
res.body = errbuf;
}
curl_easy_cleanup(pCurl);
return res;
}
vector<Result> get(const std::vector<std::string>& urls, const Config& config)
{
return get(urls, "", "", config);
}
vector<Result> get(const std::vector<std::string>& urls,
@ -381,6 +681,24 @@ vector<Result> get(const std::vector<std::string>& urls,
return results;
}
const char* to_string(Async::status_t status)
{
switch (status)
{
case Async::READY:
return "READY";
case Async::PENDING:
return "PENDING";
case Async::ERROR:
return "ERROR";
}
mxb_assert(!true);
return "Unknown";
}
}
}

View File

@ -15,6 +15,7 @@
#include <chrono>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <maxbase/log.hh>
@ -44,19 +45,15 @@ int test_http()
cout << "error: Exit code not 200 but: " << res.code << endl;
}
return rv;
return rv == EXIT_FAILURE ? 1 : 0;
}
int test_multi_http()
int check_results(const vector<string>& urls,
const vector<bool>& expected_successes,
const vector<mxb::http::Result> results)
{
cout << __func__ << endl;
int rv = EXIT_SUCCESS;
vector<string> urls = { "http://www.example.com/", "http://www.example.com/", "http://non-existent.xyz" };
vector<bool> expected_successes = { true, true, false };
vector<mxb::http::Result> results = mxb::http::get(urls);
for (size_t i = 0; i < urls.size(); ++i)
{
const auto& url = urls[i];
@ -71,7 +68,7 @@ int test_multi_http()
{
if (res.headers.count("Date"))
{
cout << "The date is: " << res.headers["Date"] << endl;
cout << "The date is: " << res.headers.at("Date") << endl;
}
else
{
@ -95,6 +92,56 @@ int test_multi_http()
return rv;
}
int test_multi_http()
{
cout << __func__ << endl;
vector<string> urls = { "http://www.example.com/", "http://www.example.com/", "http://non-existent.xyz" };
vector<bool> expected_successes = { true, true, false };
vector<mxb::http::Result> results = mxb::http::get(urls);
int rv = check_results(urls, expected_successes, results);
return rv == EXIT_FAILURE ? 1 : 0;
}
int test_async_http()
{
cout << __func__ << endl;
int rv = EXIT_FAILURE;
vector<string> urls = { "http://www.example.com/", "http://www.example.com/", "http://non-existent.xyz" };
vector<bool> expected_successes = { true, true, false };
mxb::http::Async http = mxb::http::get_async(urls);
while (http.perform(0) == mxb::http::Async::PENDING)
{
long ms = http.wait_no_more_than();
if (ms > 100)
{
ms = 100;
}
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
}
if (http.status() == mxb::http::Async::READY)
{
const vector<mxb::http::Result>& results = http.results();
rv = check_results(urls, expected_successes, results);
}
else
{
cout << "http::Async: " << to_string(http.status()) << endl;
rv = EXIT_FAILURE;
}
return rv == EXIT_FAILURE ? 1 : 0;
}
}
uint64_t time_since_epoch_ms()
@ -107,12 +154,15 @@ uint64_t time_since_epoch_ms()
int main()
{
int rv = EXIT_SUCCESS;
int rv = 0;
mxb::Log log;
auto start = time_since_epoch_ms();
long start;
long stop;
start = time_since_epoch_ms();
rv += test_http();
auto stop = time_since_epoch_ms();
stop = time_since_epoch_ms();
cout << "Single: " << stop - start << endl;
start = time_since_epoch_ms();
@ -120,5 +170,10 @@ int main()
stop = time_since_epoch_ms();
cout << "Multi: " << stop - start << endl;
start = time_since_epoch_ms();
rv += test_async_http();
stop = time_since_epoch_ms();
cout << "Async: " << stop - start << endl;
return rv;
}