MXS-2208 Implement synchronous GET in terms of asynch

This commit is contained in:
Johan Wikman
2018-12-17 15:59:12 +02:00
parent 847f53b21b
commit 0d733bb1bb

View File

@ -14,9 +14,11 @@
#include <maxbase/http.hh>
#include <algorithm>
#include <array>
#include <chrono>
#include <map>
#include <unordered_map>
#include <utility>
#include <thread>
#include <curl/curl.h>
#include <maxbase/assert.h>
#include <maxbase/string.hh>
@ -144,177 +146,6 @@ struct Context
Errbuf * pErrbuf;
};
void execute(CURLM* pCurlm,
const Config& config,
unordered_map<CURL*, Context>& curls)
{
int timeout_ms = (config.connect_timeout_s + config.timeout_s) * 1000;
int still_running {0};
int numfs {0};
int repeats {0};
CURLMcode rv = curl_multi_perform(pCurlm, &still_running);
if (rv == CURLM_OK)
{
while ((rv == CURLM_OK) && (still_running != 0))
{
fd_set fdread;
fd_set fdwrite;
fd_set fdexcep;
FD_ZERO(&fdread);
FD_ZERO(&fdwrite);
FD_ZERO(&fdexcep);
long default_timeout = 100; // 100ms
long curl_timeout = -1;
curl_multi_timeout(pCurlm, &curl_timeout);
if ((curl_timeout >= 0) && (curl_timeout < default_timeout))
{
default_timeout = curl_timeout;
}
struct timeval timeout = { 0, default_timeout };
int maxfd;
rv = curl_multi_fdset(pCurlm, &fdread, &fdwrite, &fdexcep, &maxfd);
if (rv == CURLM_OK)
{
int rc;
if (maxfd == -1)
{
struct timeval wait = { 0, 100 * 1000 }; /* 100ms */
rc = select(0, NULL, NULL, NULL, &wait);
}
else
{
rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout);
}
switch (rc)
{
case -1:
mxb_assert(!true);
MXB_ERROR("select() failed: %s", mxb_strerror(errno));
rv = CURLM_INTERNAL_ERROR;
break;
case 0:
default:
rv = curl_multi_perform(pCurlm, &still_running);
if (rv != CURLM_OK)
{
MXB_ERROR("curl_multi_perform() failed, error: %d, %s", (int)rv, curl_multi_strerror(rv));
}
}
}
else
{
MXB_ERROR("curl_multi_fdset() failed, error: %d, %s", (int)rv, curl_multi_strerror(rv));
}
}
}
else
{
MXB_ERROR("curl_multi_perform() failed, error: %d, %s", (int)rv, curl_multi_strerror(rv));
}
if (rv == CURLM_OK)
{
int nRemaining = 0;
do
{
CURLMsg* pMsg = curl_multi_info_read(pCurlm, &nRemaining);
if (pMsg && (pMsg->msg == CURLMSG_DONE))
{
CURL* pCurl = pMsg->easy_handle;
auto it = curls.find(pCurl);
mxb_assert(it != curls.end());
auto& context = it->second;
Result* pResult = context.pResult;
Errbuf* pErrbuf = context.pErrbuf;
long code;
curl_easy_getinfo(pCurl, CURLINFO_RESPONSE_CODE, &code);
pResult->code = code;
if ((code == 0) && pResult->body.empty())
{
pResult->body = pErrbuf->data();
}
curls.erase(it);
curl_multi_remove_handle(pCurlm, pCurl);
curl_easy_cleanup(pCurl);
}
}
while (nRemaining != 0);
}
}
vector<Result> get_curlm(CURLM* pCurlm,
const std::vector<std::string>& urls,
const std::string& user, const std::string& password,
const Config& config)
{
vector<Result> results;
vector<array<char, CURL_ERROR_SIZE + 1>> errbufs;
unordered_map<CURL*, Context> curls;
results.reserve(urls.size());
errbufs.reserve(urls.size());
size_t i;
for (i = 0; i < urls.size(); ++i)
{
results.resize(i + 1);
errbufs.resize(i + 1);
CURL* pCurl = get_easy_curl(urls[i], user, password, config, &results[i], errbufs[i].data());
if (!pCurl || (curl_multi_add_handle(pCurlm, pCurl) != CURLM_OK))
{
mxb_assert(!true);
if (pCurl)
{
curl_easy_cleanup(pCurl);
}
break;
}
else
{
curls.insert(std::make_pair(pCurl, Context(&results[i], &errbufs[i])));
}
}
if (i == urls.size())
{
// Success
execute(pCurlm, config, curls);
}
else
{
--i;
mxb_assert(i == curls.size());
for (auto& item : curls)
{
CURL* pCurl = item.first;
MXB_AT_DEBUG(CURLMcode rv =) curl_multi_remove_handle(pCurlm, pCurl);
mxb_assert(rv == CURLM_OK);
curl_easy_cleanup(pCurl);
}
}
return results;
}
class ErrorImp : public Async::Imp
{
public:
@ -635,7 +466,6 @@ Result get(const std::string& url, const std::string& user, const std::string& p
CURL* pCurl = get_easy_curl(url, user, password, config, &res, errbuf);
mxb_assert(pCurl);
if (curl_easy_perform(pCurl) == CURLE_OK)
{
long code = 0; // needs to be a long
@ -662,20 +492,25 @@ vector<Result> get(const std::vector<std::string>& urls,
const std::string& user, const std::string& password,
const Config& config)
{
vector<Result> results;
results.reserve(urls.size());
Async http = get_async(urls, user, password, config);
CURLM* pCurlm = curl_multi_init();
mxb_assert(pCurlm);
if (pCurlm)
while (http.perform() == Async::PENDING)
{
results = get_curlm(pCurlm, urls, user, password, config);
long ms = http.wait_no_more_than();
if (curl_multi_cleanup(pCurlm) != CURLM_OK)
if (ms > 100)
{
mxb_assert(!true);
ms = 100;
}
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
}
vector<Result> results(http.results());
if (results.size() != urls.size())
{
results.resize(urls.size());
}
return results;