From 847f53b21bb7c2849f01f98dd9d59376d4a6bc16 Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Mon, 17 Dec 2018 15:45:56 +0200 Subject: [PATCH] MXS-2208 Introduce asynchronous GET A multi HTTP GET can now be performed so that the caller drives the polling of results. This can now be used so that inside a monitor worker, the delays are handled using delayed calls. Consequently, the monitor worker event loop can remain responsive even though the Clustrix nodes are being polled. --- maxutils/maxbase/include/maxbase/http.hh | 157 ++++++++- maxutils/maxbase/src/http.cc | 414 ++++++++++++++++++++--- maxutils/maxbase/src/test/test_http.cc | 79 ++++- 3 files changed, 589 insertions(+), 61 deletions(-) diff --git a/maxutils/maxbase/include/maxbase/http.hh b/maxutils/maxbase/include/maxbase/http.hh index 0010fb813..1eb69c3d6 100644 --- a/maxutils/maxbase/include/maxbase/http.hh +++ b/maxutils/maxbase/include/maxbase/http.hh @@ -91,7 +91,162 @@ std::vector get(const std::vector& urls, std::vector get(const std::vector& urls, const std::string& user, const std::string& password, const Config& config = Config()); -} + +/** + * @class mxb::http::Async + * + * Class for performing multiple HTTP GETs concurrently and asynchronously. + * The instance should be viewed as a handle to the operation. If it is + * copied, both instances refer to the same operation and both instances + * can be used for driving the GET. However, an instance can *only* be + * used or copied in the thread where it is created. + */ +class Async +{ +public: + enum status_t + { + READY, // The result is ready. + ERROR, // The operation has failed. + PENDING // The operation is pending. + }; + + class Imp + { + public: + virtual ~Imp(); + virtual status_t status() const = 0; + + virtual status_t perform(long timeout_ms) = 0; + + virtual long wait_no_more_than() const = 0; + + virtual const std::vector& results() const = 0; + }; + + /** + * Defalt constructor creates an asynchronous operation whose status is ERROR. + */ + Async(); + + /** + * Copy constructor; creates an instance that refers to the same operation + * the argument refers to. + * + * @param rhs An existing @c Async instance. + */ + Async(const Async& rhs) + : m_sImp(rhs.m_sImp) + { + } + + /** + * Assigns an asynchronous operation. + * + * @param rhs An other @c Async instance. + * + * @return *this. + */ + Async& operator = (const Async& rhs) + { + std::shared_ptr sImp(rhs.m_sImp); + m_sImp.swap(sImp); + return *this; + } + + /** + * Return the status of the operation. + * + * @return @c READY|ERROR|PENDING + */ + status_t status() const + { + return m_sImp->status(); + } + + /** + * Performs a step in the operation. + * + * @param timeout_ms The maximum timeout for waiting for activity on the + * underlying socket descriptors. + * + * @return @c READY|ERROR|PENDING + */ + status_t perform(long timeout_ms = 0) + { + return m_sImp->perform(timeout_ms); + } + + /** + * How much time to wait at most, before calling perform() again. + * + * This value is dependent upon the timeouts that were specified when + * the operation was initiated. To ensure that operations are not timed + * out, do not wait as long as this function returns but significantly + * less. + * + * @return Maximum time to wait in milliseconds. + */ + long wait_no_more_than() const + { + return m_sImp->wait_no_more_than(); + } + + /** + * The result of each operation. This function should not be called + * before the status is READY. + * + * @return Vector of results. + */ + const std::vector& results() const + { + return m_sImp->results(); + } + +public: + Async(const std::shared_ptr& sImp) + : m_sImp(sImp) + { + } + +private: + std::shared_ptr m_sImp; +}; + +/** + * Return human-readable string for a status value. + * + * @param status A status value. + * + * @return The corresponding string. + */ +const char* to_string(Async::status_t status); + +/** + * Do a HTTP GET, asynchronously. + * + * @param urls The URLs to GET. + * @param config The config to use. + * + * @return An Async instance using which the operation can be performed. + */ +Async get_async(const std::vector& urls, + const Config& config = Config()); + +/** + * Do a HTTP GET, asynchronously. + * + * @param urls The URLs to GET. + * @param user Username to use. + * @param password Password for the user. + * @param config The config to use. + * + * @return An Async instance using which the operation can be performed. + */ +Async get_async(const std::vector& urls, + const std::string& user, const std::string& password, + const Config& config = Config()); } +} diff --git a/maxutils/maxbase/src/http.cc b/maxutils/maxbase/src/http.cc index 64e921cb1..1e8658088 100644 --- a/maxutils/maxbase/src/http.cc +++ b/maxutils/maxbase/src/http.cc @@ -14,11 +14,12 @@ #include #include #include +#include #include +#include #include #include #include -#include using namespace std; using std::array; @@ -128,53 +129,6 @@ CURL* get_easy_curl(const std::string& url, return pCurl; } -} - - -namespace maxbase -{ - -namespace http -{ - -Result get(const std::string& url, const Config& config) -{ - return std::move(get(url, "", "", config)); -} - -Result get(const std::string& url, const std::string& user, const std::string& password, const Config& config) -{ - Result res; - char errbuf[CURL_ERROR_SIZE + 1] = ""; - CURL* pCurl = get_easy_curl(url, user, password, config, &res, errbuf); - mxb_assert(pCurl); - - - if (curl_easy_perform(pCurl) == CURLE_OK) - { - long code = 0; // needs to be a long - curl_easy_getinfo(pCurl, CURLINFO_RESPONSE_CODE, &code); - res.code = code; - } - else - { - res.code = -1; - res.body = errbuf; - } - - curl_easy_cleanup(pCurl); - - return res; -} - -vector get(const std::vector& urls, const Config& config) -{ - return get(urls, "", "", config); -} - -namespace -{ - using Errbuf = array; struct Context @@ -290,6 +244,11 @@ void execute(CURLM* pCurlm, curl_easy_getinfo(pCurl, CURLINFO_RESPONSE_CODE, &code); pResult->code = code; + if ((code == 0) && pResult->body.empty()) + { + pResult->body = pErrbuf->data(); + } + curls.erase(it); curl_multi_remove_handle(pCurlm, pCurl); curl_easy_cleanup(pCurl); @@ -356,6 +315,347 @@ vector get_curlm(CURLM* pCurlm, return results; } +class ErrorImp : public Async::Imp +{ +public: + ErrorImp() + { + } + + Async::status_t status() const + { + return Async::ERROR; + } + + Async::status_t perform(long timeout_ms) + { + return Async::ERROR; + } + + long wait_no_more_than() const + { + return 0; + } + + const std::vector& results() const + { + return m_results; + } + +private: + vector m_results; +}; + +class HttpImp : public Async::Imp +{ +public: + HttpImp() + : m_pCurlm(curl_multi_init()) + , m_status(Async::ERROR) + , m_still_running(0) + , m_wait_no_more_than(0) + { + mxb_assert(m_pCurlm); + if (!m_pCurlm) + { + throw std::bad_alloc(); + } + } + + ~HttpImp() + { + mxb_assert(m_pCurlm); + + for (auto& item : m_curls) + { + CURL* pCurl = item.first; + MXB_AT_DEBUG(CURLMcode rv =) curl_multi_remove_handle(m_pCurlm, pCurl); + mxb_assert(rv == CURLM_OK); + curl_easy_cleanup(pCurl); + } + + CURLMcode code = curl_multi_cleanup(m_pCurlm); + if (code != CURLM_OK) + { + MXB_ERROR("curl_multi_cleanup() failed: %s", curl_multi_strerror(code)); + } + } + + bool initialize(const std::vector& urls, + const std::string& user, const std::string& password, + const Config& config) + { + mxb_assert(m_status == Async::ERROR); + + m_results.reserve(urls.size()); + m_errbufs.reserve(urls.size()); + + size_t i; + for (i = 0; i < urls.size(); ++i) + { + m_results.resize(i + 1); + m_errbufs.resize(i + 1); + + CURL* pCurl = get_easy_curl(urls[i], user, password, config, &m_results[i], m_errbufs[i].data()); + + if (!pCurl || (curl_multi_add_handle(m_pCurlm, pCurl) != CURLM_OK)) + { + mxb_assert(!true); + if (pCurl) + { + curl_easy_cleanup(pCurl); + } + m_results.resize(m_results.size() - 1); + break; + } + else + { + m_curls.insert(std::make_pair(pCurl, Context(&m_results[i], &m_errbufs[i]))); + } + } + + if (m_results.size() == urls.size()) + { + CURLMcode rv = curl_multi_perform(m_pCurlm, &m_still_running); + + if (rv == CURLM_OK) + { + if (m_still_running == 0) + { + m_status = Async::READY; + m_wait_no_more_than = 0; + } + else + { + update_timeout(); + m_status = Async::PENDING; + } + } + else + { + MXB_ERROR("curl_multi_perform() failed: %s", curl_multi_strerror(rv)); + m_status = Async::ERROR; + } + } + + return m_status != Async::ERROR; + } + + Async::status_t status() const override + { + return m_status; + } + + Async::status_t perform(long timeout_ms) override + { + switch (m_status) + { + case Async::READY: + break; + + case Async::ERROR: + mxb_assert(!true); + break; + + default: + { + mxb_assert(m_status == Async::PENDING); + + fd_set fdread; + fd_set fdwrite; + fd_set fdexcep; + + FD_ZERO(&fdread); + FD_ZERO(&fdwrite); + FD_ZERO(&fdexcep); + + int maxfd; + CURLMcode rv_curl = curl_multi_fdset(m_pCurlm, &fdread, &fdwrite, &fdexcep, &maxfd); + + if (rv_curl == CURLM_OK) + { + int rv = 0; + if (maxfd != -1) + { + struct timeval timeout = { timeout_ms / 1000, (timeout_ms % 1000) * 1000 }; + rv = select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout); + } + + switch (rv) + { + case -1: + mxb_assert(!true); + MXB_ERROR("select() failed: %s", mxb_strerror(errno)); + m_status = Async::ERROR; + break; + + case 0: + default: + rv_curl = curl_multi_perform(m_pCurlm, &m_still_running); + if (rv_curl == CURLM_OK) + { + if (m_still_running == 0) + { + m_status = Async::READY; + } + else + { + update_timeout(); + } + } + else + { + MXB_ERROR("curl_multi_perform() failed: %s", curl_multi_strerror(rv_curl)); + m_status = Async::ERROR; + } + } + } + + if (m_status == Async::READY) + { + mxb_assert(m_still_running == 0); + int nRemaining = 0; + do + { + CURLMsg* pMsg = curl_multi_info_read(m_pCurlm, &nRemaining); + if (pMsg && (pMsg->msg == CURLMSG_DONE)) + { + CURL* pCurl = pMsg->easy_handle; + auto it = m_curls.find(pCurl); + mxb_assert(it != m_curls.end()); + + auto& context = it->second; + Result* pResult = context.pResult; + Errbuf* pErrbuf = context.pErrbuf; + + long code; + curl_easy_getinfo(pCurl, CURLINFO_RESPONSE_CODE, &code); + pResult->code = code; + + if ((code == 0) && pResult->body.empty()) + { + pResult->body = pErrbuf->data(); + } + + m_curls.erase(it); + curl_multi_remove_handle(m_pCurlm, pCurl); + curl_easy_cleanup(pCurl); + } + } + while (nRemaining != 0); + } + } + } + + return m_status; + } + + long wait_no_more_than() const + { + return m_wait_no_more_than; + } + + const std::vector& results() const + { + return m_results; + } + +private: + void update_timeout() + { + curl_multi_timeout(m_pCurlm, &m_wait_no_more_than); + if (m_wait_no_more_than < 0) + { + // No default value, we'll use 100ms as default. + m_wait_no_more_than = 100; + } + } + +private: + CURLM* m_pCurlm; + Async::status_t m_status; + vector m_results; + vector> m_errbufs; + unordered_map m_curls; + int m_still_running; + long m_wait_no_more_than; +}; + +} + + +namespace maxbase +{ + +namespace http +{ + +Async::Imp::~Imp() +{ +} + +Async::Async() + : m_sImp(std::make_shared()) +{ +} + +Async get_async(const std::vector& urls, + const Config& config) +{ + return get_async(urls, "", "", config); +} + +Async get_async(const std::vector& urls, + const std::string& user, const std::string& password, + const Config& config) +{ + shared_ptr sImp; + shared_ptr sHttp_imp = std::make_shared(); + if (sHttp_imp->initialize(urls, user, password, config)) + { + sImp = sHttp_imp; + } + else + { + sImp = std::make_shared(); + } + + return Async(sImp); +} + +Result get(const std::string& url, const Config& config) +{ + return std::move(get(url, "", "", config)); +} + +Result get(const std::string& url, const std::string& user, const std::string& password, const Config& config) +{ + Result res; + char errbuf[CURL_ERROR_SIZE + 1] = ""; + CURL* pCurl = get_easy_curl(url, user, password, config, &res, errbuf); + mxb_assert(pCurl); + + + if (curl_easy_perform(pCurl) == CURLE_OK) + { + long code = 0; // needs to be a long + curl_easy_getinfo(pCurl, CURLINFO_RESPONSE_CODE, &code); + res.code = code; + } + else + { + res.code = -1; + res.body = errbuf; + } + + curl_easy_cleanup(pCurl); + + return res; +} + +vector get(const std::vector& urls, const Config& config) +{ + return get(urls, "", "", config); } vector get(const std::vector& urls, @@ -381,6 +681,24 @@ vector get(const std::vector& urls, return results; } +const char* to_string(Async::status_t status) +{ + switch (status) + { + case Async::READY: + return "READY"; + + case Async::PENDING: + return "PENDING"; + + case Async::ERROR: + return "ERROR"; + } + + mxb_assert(!true); + return "Unknown"; +} + } } diff --git a/maxutils/maxbase/src/test/test_http.cc b/maxutils/maxbase/src/test/test_http.cc index f97392769..3b0110ae3 100644 --- a/maxutils/maxbase/src/test/test_http.cc +++ b/maxutils/maxbase/src/test/test_http.cc @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -44,19 +45,15 @@ int test_http() cout << "error: Exit code not 200 but: " << res.code << endl; } - return rv; + return rv == EXIT_FAILURE ? 1 : 0; } -int test_multi_http() +int check_results(const vector& urls, + const vector& expected_successes, + const vector results) { - cout << __func__ << endl; - int rv = EXIT_SUCCESS; - vector urls = { "http://www.example.com/", "http://www.example.com/", "http://non-existent.xyz" }; - vector expected_successes = { true, true, false }; - vector results = mxb::http::get(urls); - for (size_t i = 0; i < urls.size(); ++i) { const auto& url = urls[i]; @@ -71,7 +68,7 @@ int test_multi_http() { if (res.headers.count("Date")) { - cout << "The date is: " << res.headers["Date"] << endl; + cout << "The date is: " << res.headers.at("Date") << endl; } else { @@ -95,6 +92,56 @@ int test_multi_http() return rv; } +int test_multi_http() +{ + cout << __func__ << endl; + + vector urls = { "http://www.example.com/", "http://www.example.com/", "http://non-existent.xyz" }; + vector expected_successes = { true, true, false }; + vector results = mxb::http::get(urls); + + int rv = check_results(urls, expected_successes, results); + + return rv == EXIT_FAILURE ? 1 : 0; +} + +int test_async_http() +{ + cout << __func__ << endl; + + int rv = EXIT_FAILURE; + + vector urls = { "http://www.example.com/", "http://www.example.com/", "http://non-existent.xyz" }; + vector expected_successes = { true, true, false }; + mxb::http::Async http = mxb::http::get_async(urls); + + while (http.perform(0) == mxb::http::Async::PENDING) + { + long ms = http.wait_no_more_than(); + + if (ms > 100) + { + ms = 100; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(ms)); + } + + if (http.status() == mxb::http::Async::READY) + { + const vector& results = http.results(); + + rv = check_results(urls, expected_successes, results); + } + else + { + cout << "http::Async: " << to_string(http.status()) << endl; + rv = EXIT_FAILURE; + } + + return rv == EXIT_FAILURE ? 1 : 0; +} + } uint64_t time_since_epoch_ms() @@ -107,12 +154,15 @@ uint64_t time_since_epoch_ms() int main() { - int rv = EXIT_SUCCESS; + int rv = 0; mxb::Log log; - auto start = time_since_epoch_ms(); + long start; + long stop; + + start = time_since_epoch_ms(); rv += test_http(); - auto stop = time_since_epoch_ms(); + stop = time_since_epoch_ms(); cout << "Single: " << stop - start << endl; start = time_since_epoch_ms(); @@ -120,5 +170,10 @@ int main() stop = time_since_epoch_ms(); cout << "Multi: " << stop - start << endl; + start = time_since_epoch_ms(); + rv += test_async_http(); + stop = time_since_epoch_ms(); + cout << "Async: " << stop - start << endl; + return rv; }