Improved multithreading for the network code

This commit is contained in:
Leon Styhre 2024-04-08 19:26:08 +02:00
parent d66a2f0e08
commit f839ced47b
5 changed files with 222 additions and 119 deletions

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT // SPDX-License-Identifier: MIT
// //
// ES-DE is a frontend for browsing and launching games from your multi-platform game collection. // ES-DE is a frontend for browsing and launching games from your multi-platform collection.
// //
// The column limit is 100 characters. // The column limit is 100 characters.
// All ES-DE C++ source code is formatted using clang-format. // All ES-DE C++ source code is formatted using clang-format.
@ -1121,6 +1121,8 @@ int main(int argc, char* argv[])
#if defined(APPLICATION_UPDATER) #if defined(APPLICATION_UPDATER)
if (ApplicationUpdater::getInstance().getResults()) if (ApplicationUpdater::getInstance().getResults())
ViewController::getInstance()->updateAvailableDialog(); ViewController::getInstance()->updateAvailableDialog();
else
HttpReq::cleanupCurlMulti();
#endif #endif
#if defined(_WIN64) #if defined(_WIN64)

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT // SPDX-License-Identifier: MIT
// //
// ES-DE // ES-DE Frontend
// ViewController.cpp // ViewController.cpp
// //
// Handles overall system navigation including animations and transitions. // Handles overall system navigation including animations and transitions.
@ -417,7 +417,12 @@ void ViewController::updateAvailableDialog()
0.535f * (1.778f / mRenderer->getScreenAspectRatio())))); 0.535f * (1.778f / mRenderer->getScreenAspectRatio()))));
} }
}, },
"CANCEL", [] { return; }, "", nullptr, nullptr, true, true, "CANCEL",
[] {
HttpReq::cleanupCurlMulti();
return;
},
"", nullptr, nullptr, true, true,
(mRenderer->getIsVerticalOrientation() ? (mRenderer->getIsVerticalOrientation() ?
0.70f : 0.70f :
0.45f * (1.778f / mRenderer->getScreenAspectRatio())))); 0.45f * (1.778f / mRenderer->getScreenAspectRatio()))));

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT // SPDX-License-Identifier: MIT
// //
// ES-DE // ES-DE Frontend
// ViewController.h // ViewController.h
// //
// Handles overall system navigation including animations and transitions. // Handles overall system navigation including animations and transitions.

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT // SPDX-License-Identifier: MIT
// //
// ES-DE // ES-DE Frontend
// HttpReq.cpp // HttpReq.cpp
// //
// HTTP requests using libcurl. // HTTP requests using libcurl.
@ -14,6 +14,7 @@
#include "resources/ResourceManager.h" #include "resources/ResourceManager.h"
#include "utils/FileSystemUtil.h" #include "utils/FileSystemUtil.h"
#include <algorithm>
#include <assert.h> #include <assert.h>
std::string HttpReq::urlEncode(const std::string& s) std::string HttpReq::urlEncode(const std::string& s)
@ -37,42 +38,41 @@ std::string HttpReq::urlEncode(const std::string& s)
} }
HttpReq::HttpReq(const std::string& url, bool scraperRequest) HttpReq::HttpReq(const std::string& url, bool scraperRequest)
: mStatus(REQ_IN_PROGRESS) : mStatus {REQ_IN_PROGRESS}
, mHandle(nullptr) , mHandle {nullptr}
, mTotalBytes {0} , mTotalBytes {0}
, mDownloadedBytes {0} , mDownloadedBytes {0}
, mScraperRequest {scraperRequest} , mScraperRequest {scraperRequest}
{ {
// The multi-handle is cleaned up via a call from GuiScraperSearch after the scraping // The multi-handle is cleaned up via an explicit call to cleanupCurlMulti() from any object
// has been completed for a game, meaning the handle is valid for all curl requests // that uses HttpReq. For example from GuiScraperSearch after scraping has been completed.
// performed for the current game.
if (!sMultiHandle) if (!sMultiHandle)
sMultiHandle = curl_multi_init(); sMultiHandle = curl_multi_init();
mHandle = curl_easy_init(); mHandle = curl_easy_init();
#if defined(USE_BUNDLED_CERTIFICATES)
// Use the bundled curl TLS/SSL certificates (which actually come from the Mozilla project).
// This is enabled by default on Windows. Although there is a possibility to use the OS
// provided Schannel certificates I haven't been able to get this to work, and it also seems
// to be problematic on older Windows versions.
// The bundled certificates are also required on Linux when building an AppImage package as
// distributions such as Debian, Ubuntu, Linux Mint and Manjaro place the TLS certificates in
// a different directory than for example Fedora and openSUSE. This makes curl unusable on
// these latter operating systems unless the bundled file is used.
curl_easy_setopt(mHandle, CURLOPT_CAINFO,
ResourceManager::getInstance()
.getResourcePath(":/certificates/curl-ca-bundle.crt")
.c_str());
#endif
if (mHandle == nullptr) { if (mHandle == nullptr) {
mStatus = REQ_IO_ERROR; mStatus = REQ_IO_ERROR;
onError("curl_easy_init failed"); onError("curl_easy_init failed");
return; return;
} }
// Set the url. if (!mPollThread) {
sStopPoll = false;
mPollThread = std::make_unique<std::thread>(&HttpReq::pollCurl, this);
}
#if defined(USE_BUNDLED_CERTIFICATES)
// Use the bundled curl TLS/SSL certificates (which come from the Mozilla project).
// This is used on Windows and also on Android as there is no way for curl to access
// the system certificates on this OS.
curl_easy_setopt(mHandle, CURLOPT_CAINFO,
ResourceManager::getInstance()
.getResourcePath(":/certificates/curl-ca-bundle.crt")
.c_str());
#endif
// Set the URL.
CURLcode err {curl_easy_setopt(mHandle, CURLOPT_URL, url.c_str())}; CURLcode err {curl_easy_setopt(mHandle, CURLOPT_URL, url.c_str())};
if (err != CURLE_OK) { if (err != CURLE_OK) {
mStatus = REQ_IO_ERROR; mStatus = REQ_IO_ERROR;
@ -172,7 +172,7 @@ HttpReq::HttpReq(const std::string& url, bool scraperRequest)
} }
// Enable the curl progress meter. // Enable the curl progress meter.
err = curl_easy_setopt(mHandle, CURLOPT_NOPROGRESS, 0); err = curl_easy_setopt(mHandle, CURLOPT_NOPROGRESS, mScraperRequest ? 1 : 0);
if (err != CURLE_OK) { if (err != CURLE_OK) {
mStatus = REQ_IO_ERROR; mStatus = REQ_IO_ERROR;
onError(curl_easy_strerror(err)); onError(curl_easy_strerror(err));
@ -188,11 +188,13 @@ HttpReq::HttpReq(const std::string& url, bool scraperRequest)
} }
// Progress meter callback. // Progress meter callback.
err = curl_easy_setopt(mHandle, CURLOPT_XFERINFOFUNCTION, HttpReq::transferProgress); if (!mScraperRequest) {
if (err != CURLE_OK) { err = curl_easy_setopt(mHandle, CURLOPT_XFERINFOFUNCTION, HttpReq::transferProgress);
mStatus = REQ_IO_ERROR; if (err != CURLE_OK) {
onError(curl_easy_strerror(err)); mStatus = REQ_IO_ERROR;
return; onError(curl_easy_strerror(err));
return;
}
} }
// Fail on HTTP status codes >= 400. // Fail on HTTP status codes >= 400.
@ -203,92 +205,33 @@ HttpReq::HttpReq(const std::string& url, bool scraperRequest)
return; return;
} }
// Add the handle to our multi. // Add the handle to the multi. This is done in pollCurl(), running in a separate thread.
CURLMcode merr {curl_multi_add_handle(sMultiHandle, mHandle)}; std::unique_lock<std::mutex> handleLock {sHandleMutex};
if (merr != CURLM_OK) { sAddHandleQueue.push(mHandle);
mStatus = REQ_IO_ERROR; handleLock.unlock();
onError(curl_multi_strerror(merr));
return;
}
curl_multi_wakeup(sMultiHandle);
std::unique_lock<std::mutex> requestLock {sRequestMutex};
sRequests[mHandle] = this; sRequests[mHandle] = this;
requestLock.unlock();
} }
HttpReq::~HttpReq() HttpReq::~HttpReq()
{ {
if (mHandle) { if (mHandle) {
std::unique_lock<std::mutex> requestLock {sRequestMutex};
sRequests.erase(mHandle); sRequests.erase(mHandle);
requestLock.unlock();
CURLMcode merr {curl_multi_remove_handle(sMultiHandle, mHandle)}; std::unique_lock<std::mutex> handleLock {sHandleMutex};
sRemoveHandleQueue.push(mHandle);
handleLock.unlock();
if (merr != CURLM_OK) { curl_multi_wakeup(sMultiHandle);
LOG(LogError) << "Error removing curl_easy handle from curl_multi: "
<< curl_multi_strerror(merr);
}
curl_easy_cleanup(mHandle);
} }
} }
HttpReq::Status HttpReq::status()
{
if (mStatus == REQ_IN_PROGRESS) {
int handleCount {0};
CURLMcode merr {curl_multi_perform(sMultiHandle, &handleCount)};
if (merr != CURLM_OK && merr != CURLM_CALL_MULTI_PERFORM) {
mStatus = REQ_IO_ERROR;
onError(curl_multi_strerror(merr));
return mStatus;
}
int msgsLeft;
CURLMsg* msg;
while ((msg = curl_multi_info_read(sMultiHandle, &msgsLeft)) != nullptr) {
if (msg->msg == CURLMSG_DONE) {
HttpReq* req {sRequests[msg->easy_handle]};
if (req == nullptr) {
LOG(LogError) << "Cannot find easy handle!";
continue;
}
if (msg->data.result == CURLE_OK) {
req->mStatus = REQ_SUCCESS;
}
else if (msg->data.result == CURLE_PEER_FAILED_VERIFICATION) {
req->mStatus = REQ_FAILED_VERIFICATION;
req->onError(curl_easy_strerror(msg->data.result));
}
else if (msg->data.result == CURLE_HTTP_RETURNED_ERROR) {
long responseCode;
curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &responseCode);
if (responseCode == 430 &&
Settings::getInstance()->getString("Scraper") == "screenscraper") {
req->mContent << "You have exceeded your daily scrape quota";
req->mStatus = REQ_SUCCESS;
}
else if (responseCode == 404 && mScraperRequest &&
Settings::getInstance()->getBool("ScraperIgnoreHTTP404Errors")) {
req->mStatus = REQ_RESOURCE_NOT_FOUND;
}
else {
req->onError("Server returned HTTP error code " +
std::to_string(responseCode));
req->mStatus = REQ_BAD_STATUS_CODE;
}
}
else {
req->mStatus = REQ_IO_ERROR;
req->onError(curl_easy_strerror(msg->data.result));
}
}
}
}
return mStatus;
}
std::string HttpReq::getContent() const std::string HttpReq::getContent() const
{ {
assert(mStatus == REQ_SUCCESS); assert(mStatus == REQ_SUCCESS);
@ -298,21 +241,156 @@ std::string HttpReq::getContent() const
int HttpReq::transferProgress( int HttpReq::transferProgress(
void* clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) void* clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
{ {
// Note that it's not guaranteed that the server will actually provide the total size. if (dltotal == 0 && dlnow == 0)
if (dltotal > 0) return CURLE_OK;
static_cast<HttpReq*>(clientp)->mTotalBytes = static_cast<long>(dltotal);
if (dlnow > 0) // We need all the check logic below to make sure we're not attempting to write into
static_cast<HttpReq*>(clientp)->mDownloadedBytes = static_cast<long>(dlnow); // a request that has just been removed by the main thread.
bool validEntry {false};
std::unique_lock<std::mutex> requestLock {sRequestMutex};
if (std::find_if(sRequests.cbegin(), sRequests.cend(), [&clientp](auto&& entry) {
return entry.second == clientp;
}) != sRequests.cend())
validEntry = true;
if (validEntry) {
// Note that it's not guaranteed that the server will actually provide the total size.
if (dltotal > 0)
static_cast<HttpReq*>(clientp)->mTotalBytes = static_cast<long>(dltotal);
if (dlnow > 0)
static_cast<HttpReq*>(clientp)->mDownloadedBytes = static_cast<long>(dlnow);
}
requestLock.unlock();
return CURLE_OK; return CURLE_OK;
} }
size_t HttpReq::writeContent(void* buff, size_t size, size_t nmemb, void* req_ptr) size_t HttpReq::writeContent(void* buff, size_t size, size_t nmemb, void* req_ptr)
{ {
// size = size of an element, nmemb = number of elements. // We need all the check logic below to make sure we're not attempting to write into
std::stringstream& ss {static_cast<HttpReq*>(req_ptr)->mContent}; // a request that has just been removed by the main thread.
ss.write(static_cast<char*>(buff), size * nmemb); bool validEntry {false};
std::unique_lock<std::mutex> requestLock {sRequestMutex};
if (std::find_if(sRequests.cbegin(), sRequests.cend(), [&req_ptr](auto&& entry) {
return entry.second == req_ptr;
}) != sRequests.cend())
validEntry = true;
if (validEntry) {
// size = size of an element, nmemb = number of elements.
std::stringstream& ss {static_cast<HttpReq*>(req_ptr)->mContent};
ss.write(static_cast<char*>(buff), size * nmemb);
}
requestLock.unlock();
// Return value is number of elements successfully read. // Return value is number of elements successfully read.
return nmemb; return nmemb;
} }
void HttpReq::pollCurl()
{
int numfds {0};
do {
if (!sStopPoll)
curl_multi_poll(sMultiHandle, nullptr, 0, 2000, &numfds);
// Check if any easy handles should be added or removed.
std::unique_lock<std::mutex> handleLock {sHandleMutex};
if (sAddHandleQueue.size() > 0) {
// Add the handle to our multi.
CURLMcode merr {curl_multi_add_handle(sMultiHandle, sAddHandleQueue.front())};
std::unique_lock<std::mutex> requestLock {sRequestMutex};
HttpReq* req {sRequests[sAddHandleQueue.front()]};
if (merr != CURLM_OK) {
if (req != nullptr) {
req->mStatus = REQ_IO_ERROR;
req->onError(curl_multi_strerror(merr));
LOG(LogError) << "onError(): " << curl_multi_strerror(merr);
}
}
else {
if (req != nullptr)
req->mStatus = REQ_IN_PROGRESS;
}
sAddHandleQueue.pop();
requestLock.unlock();
}
if (sRemoveHandleQueue.size() > 0) {
// Remove the handle from our multi.
CURLMcode merr {curl_multi_remove_handle(sMultiHandle, sRemoveHandleQueue.front())};
if (merr != CURLM_OK) {
LOG(LogError) << "Error removing curl easy handle from curl multi: "
<< curl_multi_strerror(merr);
}
curl_easy_cleanup(sRemoveHandleQueue.front());
sRemoveHandleQueue.pop();
}
handleLock.unlock();
if (sMultiHandle != nullptr && !sStopPoll) {
int handleCount {0};
std::unique_lock<std::mutex> handleLock {sHandleMutex};
CURLMcode merr {curl_multi_perform(sMultiHandle, &handleCount)};
handleLock.unlock();
if (merr != CURLM_OK && merr != CURLM_CALL_MULTI_PERFORM) {
LOG(LogError) << "Error reading data from multi: " << curl_multi_strerror(merr);
}
int msgsLeft;
CURLMsg* msg;
while (!sStopPoll && (msg = curl_multi_info_read(sMultiHandle, &msgsLeft)) != nullptr) {
if (msg->msg == CURLMSG_DONE) {
std::unique_lock<std::mutex> requestLock {sRequestMutex};
HttpReq* req {sRequests[msg->easy_handle]};
if (req == nullptr) {
LOG(LogError) << "Cannot find easy handle!";
requestLock.unlock();
continue;
}
if (msg->data.result == CURLE_OK) {
req->mStatus = REQ_SUCCESS;
}
else if (msg->data.result == CURLE_PEER_FAILED_VERIFICATION) {
req->mStatus = REQ_FAILED_VERIFICATION;
req->onError(curl_easy_strerror(msg->data.result));
}
else if (msg->data.result == CURLE_HTTP_RETURNED_ERROR) {
long responseCode;
curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &responseCode);
if (responseCode == 430 &&
Settings::getInstance()->getString("Scraper") == "screenscraper") {
req->mContent << "You have exceeded your daily scrape quota";
req->mStatus = REQ_SUCCESS;
}
else if (responseCode == 404 && req->mScraperRequest &&
Settings::getInstance()->getBool("ScraperIgnoreHTTP404Errors")) {
req->mStatus = REQ_RESOURCE_NOT_FOUND;
}
else {
req->mStatus = REQ_BAD_STATUS_CODE;
req->onError("Server returned HTTP error code " +
std::to_string(responseCode));
}
}
else {
req->mStatus = REQ_IO_ERROR;
req->onError(curl_easy_strerror(msg->data.result));
}
requestLock.unlock();
}
}
}
} while (!sStopPoll || !sAddHandleQueue.empty() || !sRemoveHandleQueue.empty());
}

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT // SPDX-License-Identifier: MIT
// //
// ES-DE // ES-DE Frontend
// HttpReq.h // HttpReq.h
// //
// HTTP requests using libcurl. // HTTP requests using libcurl.
@ -14,7 +14,10 @@
#include <atomic> #include <atomic>
#include <map> #include <map>
#include <mutex>
#include <queue>
#include <sstream> #include <sstream>
#include <thread>
class HttpReq class HttpReq
{ {
@ -35,8 +38,7 @@ public:
// clang-format on // clang-format on
}; };
// Process any received data and return the status afterwards. Status status() { return mStatus; }
Status status();
std::string getErrorMsg() { return mErrorMsg; } std::string getErrorMsg() { return mErrorMsg; }
std::string getContent() const; std::string getContent() const;
@ -44,9 +46,15 @@ public:
long getDownloadedBytes() { return mDownloadedBytes; } long getDownloadedBytes() { return mDownloadedBytes; }
static std::string urlEncode(const std::string& s); static std::string urlEncode(const std::string& s);
// Called explicitly from any object that uses HttpReq.
static void cleanupCurlMulti() static void cleanupCurlMulti()
{ {
if (sMultiHandle != nullptr) { if (sMultiHandle != nullptr) {
sStopPoll = true;
curl_multi_wakeup(sMultiHandle);
mPollThread->join();
mPollThread.reset();
curl_multi_cleanup(sMultiHandle); curl_multi_cleanup(sMultiHandle);
sMultiHandle = nullptr; sMultiHandle = nullptr;
} }
@ -60,14 +68,24 @@ private:
void onError(const std::string& msg) { mErrorMsg = msg; } void onError(const std::string& msg) { mErrorMsg = msg; }
static inline std::map<CURL*, HttpReq*> sRequests; // Poll constantly to maintain network throughput even during VSyncs and other waiting states.
static inline CURLM* sMultiHandle; void pollCurl();
Status mStatus; static inline CURLM* sMultiHandle;
static inline std::map<CURL*, HttpReq*> sRequests;
static inline std::queue<CURL*> sAddHandleQueue;
static inline std::queue<CURL*> sRemoveHandleQueue;
std::atomic<Status> mStatus;
CURL* mHandle; CURL* mHandle;
static inline std::unique_ptr<std::thread> mPollThread;
static inline std::mutex sHandleMutex;
static inline std::mutex sRequestMutex;
std::stringstream mContent; std::stringstream mContent;
std::string mErrorMsg; std::string mErrorMsg;
static inline std::atomic<bool> sStopPoll = false;
std::atomic<long> mTotalBytes; std::atomic<long> mTotalBytes;
std::atomic<long> mDownloadedBytes; std::atomic<long> mDownloadedBytes;
bool mScraperRequest; bool mScraperRequest;