MXS-2208 Add synchronous multi GET

Now possible to GET multiple URLs in one go. Behind the scenes
the getting is done asynchronously and in parallell.
This commit is contained in:
Johan Wikman
2018-12-14 09:39:10 +02:00
parent da3742324a
commit 1d338f3c4a
3 changed files with 376 additions and 28 deletions

View File

@ -16,6 +16,7 @@
#include <map>
#include <memory>
#include <string>
#include <vector>
namespace maxbase
{
@ -31,8 +32,8 @@ enum
struct Config
{
int connect_timeout = DEFAULT_CONNECT_TIMEOUT;
int timeout = DEFAULT_TIMEOUT;
int connect_timeout_s = DEFAULT_CONNECT_TIMEOUT;
int timeout_s = DEFAULT_TIMEOUT;
};
struct Result
@ -45,7 +46,7 @@ struct Result
/**
* Do a HTTP GET, when no user/password is required.
*
* @param url URL to use.
* @param url The URL to GET.
* @param config The config to use.
*
* @return A @c Result.
@ -55,9 +56,9 @@ Result get(const std::string& url, const Config& config = Config());
/**
* Do a HTTP GET
*
* @param url URL to use.
* @param user Username to use, optional.
* @param password Password for the user, optional.
* @param url The URL to GET.
* @param user Username to use.
* @param password Password for the user.
* @param config The config to use.
*
* @return A @c Result.
@ -66,6 +67,30 @@ Result get(const std::string& url,
const std::string& user, const std::string& password,
const Config& config = Config());
/**
* Do a HTTP GET, when no user/password is required.
*
* @param urls The URLs to GET.
* @param config The config to use.
*
* @return A @c Result.
*/
std::vector<Result> get(const std::vector<std::string>& urls,
const Config& config = Config());
/**
* Do a HTTP GET
*
* @param urls The URLs to GET.
* @param user Username to use.
* @param password Password for the user.
* @param config The config to use.
*
* @return Vector of @c Results, as many as there were @c urls.
*/
std::vector<Result> get(const std::vector<std::string>& urls,
const std::string& user, const std::string& password,
const Config& config = Config());
}
}

View File

@ -13,21 +13,33 @@
#include <maxbase/http.hh>
#include <algorithm>
#include <array>
#include <unordered_map>
#include <curl/curl.h>
#include <maxbase/assert.h>
#include <maxbase/string.hh>
#include <iostream>
using namespace std;
using std::array;
using std::map;
using std::string;
using std::unordered_map;
using std::vector;
namespace
{
using namespace mxb;
using namespace mxb::http;
template<class T>
inline void checked_curl_setopt(CURL* pCurl, CURLoption option, T value)
inline int checked_curl_setopt(CURL* pCurl, CURLoption option, T value)
{
MXB_AT_DEBUG(CURLcode rv =) curl_easy_setopt(pCurl, option, value);
CURLcode rv = curl_easy_setopt(pCurl, option, value);
mxb_assert(rv == CURLE_OK);
return rv == CURLE_OK ? 0 : 1;
}
// https://curl.haxx.se/libcurl/c/CURLOPT_WRITEFUNCTION.html
@ -73,6 +85,49 @@ size_t header_callback(char* ptr, size_t size, size_t nmemb, void* userdata)
return len;
}
CURL* get_easy_curl(const std::string& url,
const std::string& user, const std::string& password,
const Config& config,
Result *pRes,
char* pErrbuf)
{
CURL* pCurl = curl_easy_init();
mxb_assert(pCurl);
if (pCurl)
{
checked_curl_setopt(pCurl, CURLOPT_NOSIGNAL, 1);
checked_curl_setopt(pCurl, CURLOPT_CONNECTTIMEOUT, config.connect_timeout_s); // For connection phase
checked_curl_setopt(pCurl, CURLOPT_TIMEOUT, config.timeout_s); // For data transfer phase
checked_curl_setopt(pCurl, CURLOPT_ERRORBUFFER, pErrbuf);
checked_curl_setopt(pCurl, CURLOPT_WRITEFUNCTION, write_callback);
checked_curl_setopt(pCurl, CURLOPT_WRITEDATA, &pRes->body);
checked_curl_setopt(pCurl, CURLOPT_URL, url.c_str());
checked_curl_setopt(pCurl, CURLOPT_HEADERFUNCTION, header_callback);
checked_curl_setopt(pCurl, CURLOPT_HEADERDATA, &pRes->headers);
if (!user.empty() && !password.empty())
{
// In release mode we will silently ignore the unlikely event that the escaping fails.
char* zU = curl_easy_escape(pCurl, user.c_str(), user.length());
mxb_assert(zU);
char* zP = curl_easy_escape(pCurl, password.c_str(), password.length());
mxb_assert(zP);
string u(zU ? zU : user);
string p(zP ? zP : password);
curl_free(zU);
curl_free(zP);
checked_curl_setopt(pCurl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
checked_curl_setopt(pCurl, CURLOPT_USERPWD, (u + ":" + p).c_str());
}
}
return pCurl;
}
}
@ -91,28 +146,13 @@ Result get(const std::string& url, const std::string& user, const std::string& p
{
Result res;
char errbuf[CURL_ERROR_SIZE + 1] = "";
CURL* pCurl = curl_easy_init();
CURL* pCurl = get_easy_curl(url, user, password, config, &res, errbuf);
mxb_assert(pCurl);
checked_curl_setopt(pCurl, CURLOPT_NOSIGNAL, 1);
checked_curl_setopt(pCurl, CURLOPT_CONNECTTIMEOUT, config.connect_timeout); // For connection phase
checked_curl_setopt(pCurl, CURLOPT_TIMEOUT, config.timeout); // For data transfer phase
checked_curl_setopt(pCurl, CURLOPT_ERRORBUFFER, errbuf);
checked_curl_setopt(pCurl, CURLOPT_WRITEFUNCTION, write_callback);
checked_curl_setopt(pCurl, CURLOPT_WRITEDATA, &res.body);
checked_curl_setopt(pCurl, CURLOPT_URL, url.c_str());
checked_curl_setopt(pCurl, CURLOPT_HEADERFUNCTION, header_callback);
checked_curl_setopt(pCurl, CURLOPT_HEADERDATA, &res.headers);
if (!user.empty() && !password.empty())
{
checked_curl_setopt(pCurl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
checked_curl_setopt(pCurl, CURLOPT_USERPWD, (user + ":" + password).c_str());
}
long code = 0; // needs to be a long
if (curl_easy_perform(pCurl) == CURLE_OK)
{
long code = 0; // needs to be a long
curl_easy_getinfo(pCurl, CURLINFO_RESPONSE_CODE, &code);
res.code = code;
}
@ -124,7 +164,221 @@ Result get(const std::string& url, const std::string& user, const std::string& p
curl_easy_cleanup(pCurl);
return std::move(res);
return res;
}
vector<Result> get(const std::vector<std::string>& urls, const Config& config)
{
return get(urls, "", "", config);
}
namespace
{
using Errbuf = array<char, CURL_ERROR_SIZE + 1>;
struct Context
{
Context(Result* pResult,
Errbuf* pErrbuf)
: pResult(pResult)
, pErrbuf(pErrbuf)
{
}
mxb::http::Result* pResult;
Errbuf * pErrbuf;
};
void execute(CURLM* pCurlm,
const Config& config,
unordered_map<CURL*, Context>& curls)
{
int timeout_ms = (config.connect_timeout_s + config.timeout_s) * 1000;
int still_running {0};
int numfs {0};
int repeats {0};
CURLMcode rv = curl_multi_perform(pCurlm, &still_running);
if (rv == CURLM_OK)
{
while ((rv == CURLM_OK) && (still_running != 0))
{
fd_set fdread;
fd_set fdwrite;
fd_set fdexcep;
FD_ZERO(&fdread);
FD_ZERO(&fdwrite);
FD_ZERO(&fdexcep);
long default_timeout = 100; // 100ms
long curl_timeout = -1;
curl_multi_timeout(pCurlm, &curl_timeout);
if ((curl_timeout >= 0) && (curl_timeout < default_timeout))
{
default_timeout = curl_timeout;
}
struct timeval timeout = { 0, default_timeout };
int maxfd;
rv = curl_multi_fdset(pCurlm, &fdread, &fdwrite, &fdexcep, &maxfd);
if (rv == CURLM_OK)
{
int rc;
if (maxfd == -1)
{
rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout);
}
else
{
struct timeval wait = { 0, 100 * 1000 }; /* 100ms */
rc = select(0, NULL, NULL, NULL, &wait);
}
switch (rc)
{
case -1:
mxb_assert(!true);
MXB_ERROR("select() failed: %s", mxb_strerror(errno));
rv = CURLM_INTERNAL_ERROR;
break;
case 0:
default:
rv = curl_multi_perform(pCurlm, &still_running);
if (rv != CURLM_OK)
{
MXB_ERROR("curl_multi_perform() failed, error: %d, %s", (int)rv, curl_multi_strerror(rv));
}
}
}
else
{
MXB_ERROR("curl_multi_fdset() failed, error: %d, %s", (int)rv, curl_multi_strerror(rv));
}
}
}
else
{
MXB_ERROR("curl_multi_perform() failed, error: %d, %s", (int)rv, curl_multi_strerror(rv));
}
if (rv == CURLM_OK)
{
int nRemaining = 0;
do
{
CURLMsg* pMsg = curl_multi_info_read(pCurlm, &nRemaining);
if (pMsg && (pMsg->msg == CURLMSG_DONE))
{
CURL* pCurl = pMsg->easy_handle;
auto it = curls.find(pCurl);
mxb_assert(it != curls.end());
auto& context = it->second;
Result* pResult = context.pResult;
Errbuf* pErrbuf = context.pErrbuf;
long code;
curl_easy_getinfo(pCurl, CURLINFO_RESPONSE_CODE, &code);
pResult->code = code;
curls.erase(it);
curl_multi_remove_handle(pCurlm, pCurl);
curl_easy_cleanup(pCurl);
}
}
while (nRemaining != 0);
}
}
vector<Result> get_curlm(CURLM* pCurlm,
const std::vector<std::string>& urls,
const std::string& user, const std::string& password,
const Config& config)
{
vector<Result> results;
vector<array<char, CURL_ERROR_SIZE + 1>> errbufs;
unordered_map<CURL*, Context> curls;
results.reserve(urls.size());
errbufs.reserve(urls.size());
size_t i;
for (i = 0; i < urls.size(); ++i)
{
results.resize(i + 1);
errbufs.resize(i + 1);
CURL* pCurl = get_easy_curl(urls[i], user, password, config, &results[i], errbufs[i].data());
if (!pCurl || (curl_multi_add_handle(pCurlm, pCurl) != CURLM_OK))
{
mxb_assert(!true);
if (pCurl)
{
curl_easy_cleanup(pCurl);
}
break;
}
else
{
curls.insert(std::make_pair(pCurl, Context(&results[i], &errbufs[i])));
}
}
if (i == urls.size())
{
// Success
execute(pCurlm, config, curls);
}
else
{
--i;
mxb_assert(i == curls.size());
for (auto& item : curls)
{
CURL* pCurl = item.first;
MXB_AT_DEBUG(CURLMcode rv =) curl_multi_remove_handle(pCurlm, pCurl);
mxb_assert(rv == CURLM_OK);
curl_easy_cleanup(pCurl);
}
}
return results;
}
}
vector<Result> get(const std::vector<std::string>& urls,
const std::string& user, const std::string& password,
const Config& config)
{
vector<Result> results;
results.reserve(urls.size());
CURLM* pCurlm = curl_multi_init();
mxb_assert(pCurlm);
if (pCurlm)
{
results = get_curlm(pCurlm, urls, user, password, config);
if (curl_multi_cleanup(pCurlm) != CURLM_OK)
{
mxb_assert(!true);
}
}
return results;
}
}

View File

@ -12,7 +12,10 @@
*/
#include <maxbase/http.hh>
#include <chrono>
#include <iostream>
#include <string>
#include <vector>
#include <maxbase/log.hh>
using namespace std;
@ -22,6 +25,8 @@ namespace
int test_http()
{
cout << __func__ << endl;
int rv = EXIT_FAILURE;
auto res = mxb::http::get("http://www.example.com/");
@ -42,14 +47,78 @@ int test_http()
return rv;
}
int test_multi_http()
{
cout << __func__ << endl;
int rv = EXIT_SUCCESS;
vector<string> urls = { "http://www.example.com/", "http://www.example.com/", "http://non-existent.xyz" };
vector<bool> expected_successes = { true, true, false };
vector<mxb::http::Result> results = mxb::http::get(urls);
for (size_t i = 0; i < urls.size(); ++i)
{
const auto& url = urls[i];
auto& res = results[i];
bool expected_success = expected_successes[i];
cout << url << " responded with: " << res.code << endl;
if (expected_success)
{
if (res.code == 200)
{
if (res.headers.count("Date"))
{
cout << "The date is: " << res.headers["Date"] << endl;
}
else
{
rv = EXIT_FAILURE;
}
}
else
{
rv = EXIT_FAILURE;
}
}
else
{
if (res.code != 0)
{
rv = EXIT_FAILURE;
}
}
}
return rv;
}
}
uint64_t time_since_epoch_ms()
{
auto now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
return std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
}
int main()
{
int rv = EXIT_SUCCESS;
mxb::Log log;
rv = test_http();
auto start = time_since_epoch_ms();
rv += test_http();
auto stop = time_since_epoch_ms();
cout << "Single: " << stop - start << endl;
start = time_since_epoch_ms();
rv += test_multi_http();
stop = time_since_epoch_ms();
cout << "Multi: " << stop - start << endl;
return rv;
}