HTTPDownloaderCurl: Switch to multi/async API

This commit is contained in:
Stenzek 2023-11-06 20:49:25 +10:00
parent b459a42fb7
commit 5b00ffb6cb
No known key found for this signature in database
3 changed files with 90 additions and 66 deletions

View file

@ -174,8 +174,12 @@ void HTTPDownloader::WaitForAllRequests()
{ {
std::unique_lock<std::mutex> lock(m_pending_http_request_lock); std::unique_lock<std::mutex> lock(m_pending_http_request_lock);
while (!m_pending_http_requests.empty()) while (!m_pending_http_requests.empty())
{
// Don't burn too much CPU.
Common::Timer::NanoSleep(1000000);
LockedPollRequests(lock); LockedPollRequests(lock);
} }
}
void HTTPDownloader::LockedAddRequest(Request* request) void HTTPDownloader::LockedAddRequest(Request* request)
{ {

View file

@ -19,7 +19,11 @@ HTTPDownloaderCurl::HTTPDownloaderCurl() : HTTPDownloader()
{ {
} }
HTTPDownloaderCurl::~HTTPDownloaderCurl() = default; HTTPDownloaderCurl::~HTTPDownloaderCurl()
{
if (m_multi_handle)
curl_multi_cleanup(m_multi_handle);
}
std::unique_ptr<HTTPDownloader> HTTPDownloader::Create(const char* user_agent) std::unique_ptr<HTTPDownloader> HTTPDownloader::Create(const char* user_agent)
{ {
@ -54,8 +58,14 @@ bool HTTPDownloaderCurl::Initialize(const char* user_agent)
} }
} }
m_multi_handle = curl_multi_init();
if (!m_multi_handle)
{
Log_ErrorPrint("curl_multi_init() failed");
return false;
}
m_user_agent = user_agent; m_user_agent = user_agent;
m_thread_pool = std::make_unique<cb::ThreadPool>(m_max_active_requests);
return true; return true;
} }
@ -70,56 +80,6 @@ size_t HTTPDownloaderCurl::WriteCallback(char* ptr, size_t size, size_t nmemb, v
return nmemb; return nmemb;
} }
void HTTPDownloaderCurl::ProcessRequest(Request* req)
{
std::unique_lock<std::mutex> cancel_lock(m_cancel_mutex);
if (req->closed.load())
return;
cancel_lock.unlock();
// Apparently OpenSSL can fire SIGPIPE...
sigset_t old_block_mask = {};
sigset_t new_block_mask = {};
sigemptyset(&old_block_mask);
sigemptyset(&new_block_mask);
sigaddset(&new_block_mask, SIGPIPE);
if (pthread_sigmask(SIG_BLOCK, &new_block_mask, &old_block_mask) != 0)
Log_WarningPrint("Failed to block SIGPIPE");
req->start_time = Common::Timer::GetCurrentValue();
int ret = curl_easy_perform(req->handle);
if (ret == CURLE_OK)
{
long response_code = 0;
curl_easy_getinfo(req->handle, CURLINFO_RESPONSE_CODE, &response_code);
req->status_code = static_cast<s32>(response_code);
char* content_type = nullptr;
if (!curl_easy_getinfo(req->handle, CURLINFO_CONTENT_TYPE, &content_type) && content_type)
req->content_type = content_type;
Log_DevPrintf("Request for '%s' returned status code %d and %zu bytes", req->url.c_str(), req->status_code,
req->data.size());
}
else
{
Log_ErrorPrintf("Request for '%s' returned %d", req->url.c_str(), ret);
}
curl_easy_cleanup(req->handle);
if (pthread_sigmask(SIG_UNBLOCK, &new_block_mask, &old_block_mask) != 0)
Log_WarningPrint("Failed to unblock SIGPIPE");
cancel_lock.lock();
req->state = Request::State::Complete;
if (req->closed.load())
delete req;
else
req->closed.store(true);
}
HTTPDownloader::Request* HTTPDownloaderCurl::InternalCreateRequest() HTTPDownloader::Request* HTTPDownloaderCurl::InternalCreateRequest()
{ {
Request* req = new Request(); Request* req = new Request();
@ -135,7 +95,62 @@ HTTPDownloader::Request* HTTPDownloaderCurl::InternalCreateRequest()
void HTTPDownloaderCurl::InternalPollRequests() void HTTPDownloaderCurl::InternalPollRequests()
{ {
// noop - uses thread pool // Apparently OpenSSL can fire SIGPIPE...
sigset_t old_block_mask = {};
sigset_t new_block_mask = {};
sigemptyset(&old_block_mask);
sigemptyset(&new_block_mask);
sigaddset(&new_block_mask, SIGPIPE);
if (pthread_sigmask(SIG_BLOCK, &new_block_mask, &old_block_mask) != 0)
Log_WarningPrint("Failed to block SIGPIPE");
int running_handles;
const CURLMcode err = curl_multi_perform(m_multi_handle, &running_handles);
if (err != CURLM_OK)
Log_ErrorFmt("curl_multi_perform() returned {}", static_cast<int>(err));
for (;;)
{
int msgq;
struct CURLMsg* msg = curl_multi_info_read(m_multi_handle, &msgq);
if (!msg)
break;
if (msg->msg != CURLMSG_DONE)
{
Log_WarningFmt("Unexpected multi message {}", static_cast<int>(msg->msg));
continue;
}
Request* req;
if (curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &req) != CURLE_OK)
{
Log_ErrorPrint("curl_easy_getinfo() failed");
continue;
}
if (msg->data.result == CURLE_OK)
{
long response_code = 0;
curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &response_code);
req->status_code = static_cast<s32>(response_code);
char* content_type = nullptr;
if (curl_easy_getinfo(req->handle, CURLINFO_CONTENT_TYPE, &content_type) == CURLE_OK && content_type)
req->content_type = content_type;
Log_DevFmt("Request for '{}' returned status code {} and {} bytes", req->url, req->status_code, req->data.size());
}
else
{
Log_ErrorFmt("Request for '{}' returned error {}", req->url, static_cast<int>(msg->data.result));
}
req->state.store(Request::State::Complete, std::memory_order_release);
}
if (pthread_sigmask(SIG_UNBLOCK, &new_block_mask, &old_block_mask) != 0)
Log_WarningPrint("Failed to unblock SIGPIPE");
} }
bool HTTPDownloaderCurl::StartRequest(HTTPDownloader::Request* request) bool HTTPDownloaderCurl::StartRequest(HTTPDownloader::Request* request)
@ -146,6 +161,7 @@ bool HTTPDownloaderCurl::StartRequest(HTTPDownloader::Request* request)
curl_easy_setopt(req->handle, CURLOPT_WRITEFUNCTION, &HTTPDownloaderCurl::WriteCallback); curl_easy_setopt(req->handle, CURLOPT_WRITEFUNCTION, &HTTPDownloaderCurl::WriteCallback);
curl_easy_setopt(req->handle, CURLOPT_WRITEDATA, req); curl_easy_setopt(req->handle, CURLOPT_WRITEDATA, req);
curl_easy_setopt(req->handle, CURLOPT_NOSIGNAL, 1); curl_easy_setopt(req->handle, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(req->handle, CURLOPT_PRIVATE, req);
if (request->type == Request::Type::Post) if (request->type == Request::Type::Post)
{ {
@ -154,18 +170,27 @@ bool HTTPDownloaderCurl::StartRequest(HTTPDownloader::Request* request)
} }
Log_DevPrintf("Started HTTP request for '%s'", req->url.c_str()); Log_DevPrintf("Started HTTP request for '%s'", req->url.c_str());
req->state = Request::State::Started; req->state.store(Request::State::Started, std::memory_order_release);
req->start_time = Common::Timer::GetCurrentValue(); req->start_time = Common::Timer::GetCurrentValue();
m_thread_pool->Schedule(std::bind(&HTTPDownloaderCurl::ProcessRequest, this, req));
const CURLMcode err = curl_multi_add_handle(m_multi_handle, req->handle);
if (err != CURLM_OK)
{
Log_ErrorFmt("curl_multi_add_handle() returned {}", static_cast<int>(err));
req->callback(HTTP_STATUS_ERROR, std::string(), req->data);
curl_easy_cleanup(req->handle);
delete req;
return false;
}
return true; return true;
} }
void HTTPDownloaderCurl::CloseRequest(HTTPDownloader::Request* request) void HTTPDownloaderCurl::CloseRequest(HTTPDownloader::Request* request)
{ {
std::unique_lock<std::mutex> cancel_lock(m_cancel_mutex);
Request* req = static_cast<Request*>(request); Request* req = static_cast<Request*>(request);
if (req->closed.load()) DebugAssert(req->handle);
curl_multi_remove_handle(m_multi_handle, req->handle);
curl_easy_cleanup(req->handle);
delete req; delete req;
else
req->closed.store(true);
} }

View file

@ -4,8 +4,6 @@
#pragma once #pragma once
#include "http_downloader.h" #include "http_downloader.h"
#include "common/thirdparty/thread_pool.h"
#include <atomic> #include <atomic>
#include <curl/curl.h> #include <curl/curl.h>
#include <memory> #include <memory>
@ -29,13 +27,10 @@ private:
struct Request : HTTPDownloader::Request struct Request : HTTPDownloader::Request
{ {
CURL* handle = nullptr; CURL* handle = nullptr;
std::atomic_bool closed{false};
}; };
static size_t WriteCallback(char* ptr, size_t size, size_t nmemb, void* userdata); static size_t WriteCallback(char* ptr, size_t size, size_t nmemb, void* userdata);
void ProcessRequest(Request* req);
CURLM* m_multi_handle = nullptr;
std::string m_user_agent; std::string m_user_agent;
std::unique_ptr<cb::ThreadPool> m_thread_pool;
std::mutex m_cancel_mutex;
}; };