Add some async network methods. Async send copies to an internal buffer and sends in another thread so doesn't block at all. Added CheckDataAvailable method to the TCPReceive class. Can check or wait for data without blocking.

This commit is contained in:
Ian Curtis 2021-03-15 15:55:39 +00:00
parent 020e2d1b32
commit 4b9221ecd6
6 changed files with 258 additions and 5 deletions

View file

@ -98,8 +98,6 @@
#include "NetBoard.h" #include "NetBoard.h"
#include "Util/Format.h" #include "Util/Format.h"
#include "Util/ByteSwap.h" #include "Util/ByteSwap.h"
#include "TCPSend.h"
#include "TCPReceive.h"
#include <algorithm> #include <algorithm>
// few macros to make debugging a bit less painful // few macros to make debugging a bit less painful
@ -1236,9 +1234,9 @@ bool CNetBoard::Init(UINT8 * netRAMPtr, UINT8 *netBufferPtr)
if (m_config["EmulateNet"].ValueAs<bool>() && m_attached) { if (m_config["EmulateNet"].ValueAs<bool>() && m_attached) {
while (!nets->Connect()) { while (!nets->Connect()) {
DebugLog("Connecting to %s:%i ..\n", addr_out.c_str(), port_out); printf("Connecting to %s:%i ..\n", addr_out.c_str(), port_out);
} }
DebugLog("Successfully connected.\n"); printf("Successfully connected.\n");
} }
return OKAY; return OKAY;

View file

@ -28,6 +28,7 @@
#include "OSD/Thread.h" #include "OSD/Thread.h"
#include <memory> #include <memory>
#include "TCPSend.h" #include "TCPSend.h"
#include "TCPSendAsync.h"
#include "TCPReceive.h" #include "TCPReceive.h"
//#define NET_BUF_SIZE 32800 // 16384 not enough //#define NET_BUF_SIZE 32800 // 16384 not enough

View file

@ -35,10 +35,13 @@ using namespace std::chrono_literals;
TCPReceive::TCPReceive(int port) : TCPReceive::TCPReceive(int port) :
m_listenSocket(nullptr), m_listenSocket(nullptr),
m_receiveSocket(nullptr) m_receiveSocket(nullptr),
m_socketSet(nullptr)
{ {
SDLNet_Init(); SDLNet_Init();
m_socketSet = SDLNet_AllocSocketSet(1);
IPaddress ip; IPaddress ip;
int result = SDLNet_ResolveHost(&ip, nullptr, port); int result = SDLNet_ResolveHost(&ip, nullptr, port);
@ -69,9 +72,23 @@ TCPReceive::~TCPReceive()
m_receiveSocket = nullptr; m_receiveSocket = nullptr;
} }
if (m_socketSet) {
SDLNet_FreeSocketSet(m_socketSet);
m_socketSet = nullptr;
}
SDLNet_Quit(); SDLNet_Quit();
} }
bool TCPReceive::CheckDataAvailable(int timeoutMS)
{
if (!m_receiveSocket) {
return false;
}
return SDLNet_CheckSockets(m_socketSet, timeoutMS) > 0;
}
std::vector<char>& TCPReceive::Receive() std::vector<char>& TCPReceive::Receive()
{ {
if (!m_receiveSocket) { if (!m_receiveSocket) {
@ -119,7 +136,17 @@ void TCPReceive::ListenFunc()
auto socket = SDLNet_TCP_Accept(m_listenSocket); auto socket = SDLNet_TCP_Accept(m_listenSocket);
if (socket) { if (socket) {
// remove old socket if required from socket set
if (m_receiveSocket) {
SDLNet_DelSocket(m_socketSet, (SDLNet_GenericSocket)m_receiveSocket.load());
}
m_receiveSocket = socket; m_receiveSocket = socket;
SDLNet_AddSocket(m_socketSet, (SDLNet_GenericSocket)socket);
// add socket to socket set
DPRINTF("Accepted connection.\n"); DPRINTF("Accepted connection.\n");
} }

View file

@ -34,6 +34,7 @@ public:
TCPReceive(int port); TCPReceive(int port);
~TCPReceive(); ~TCPReceive();
bool CheckDataAvailable(int timeoutMS = 0); // timeoutMS -1 = wait forever until data arrives, 0 = no waiting, 1+ wait time in milliseconds
std::vector<char>& Receive(); std::vector<char>& Receive();
private: private:
@ -42,6 +43,7 @@ private:
TCPsocket m_listenSocket; TCPsocket m_listenSocket;
std::atomic<TCPsocket> m_receiveSocket; std::atomic<TCPsocket> m_receiveSocket;
SDLNet_SocketSet m_socketSet;
std::thread m_listenThread; std::thread m_listenThread;
std::atomic_bool m_running; std::atomic_bool m_running;
std::vector<char> m_recBuffer; std::vector<char> m_recBuffer;

View file

@ -0,0 +1,164 @@
/**
** Supermodel
** A Sega Model 3 Arcade Emulator.
** Copyright 2011-2020 Bart Trzynadlowski, Nik Henson, Ian Curtis,
** Harry Tuttle, and Spindizzi
**
** This file is part of Supermodel.
**
** Supermodel is free software: you can redistribute it and/or modify it under
** the terms of the GNU General Public License as published by the Free
** Software Foundation, either version 3 of the License, or (at your option)
** any later version.
**
** Supermodel is distributed in the hope that it will be useful, but WITHOUT
** ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
** FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
** more details.
**
** You should have received a copy of the GNU General Public License along
** with Supermodel. If not, see <http://www.gnu.org/licenses/>.
**/
#include "TCPSendAsync.h"
#include "OSD/Logger.h"
#include <utility>
#if defined(_DEBUG)
#include <stdio.h>
#define DPRINTF DebugLog
#else
#define DPRINTF(a, ...)
#endif
static const int RETRY_COUNT = 10; // shrugs
TCPSendAsync::TCPSendAsync(std::string& ip, int port) :
m_ip(ip),
m_port(port),
m_socket(nullptr),
m_hasData(false)
{
SDLNet_Init();
m_sendThread = std::thread(&TCPSendAsync::SendThread, this);
}
TCPSendAsync::~TCPSendAsync()
{
if (m_socket) {
SDLNet_TCP_Close(m_socket);
m_socket = nullptr;
}
{
std::unique_lock<std::mutex> lock(m_mutex);
m_dataBuffers.clear();
m_hasData = true; // must set data ready in case of spurious wake up
m_cv.notify_one(); // tell locked thread it can wake up
}
if (m_sendThread.joinable()) {
m_sendThread.join();
}
SDLNet_Quit(); // unload lib (winsock dll for windows)
}
bool TCPSendAsync::Send(const void * data, int length)
{
// If we failed bail out
if (!Connected()) {
DPRINTF("Not connected\n");
return false;
}
DPRINTF("Sending %i bytes\n", length);
if (!length) {
return true; // 0 sized packet will blow our connex
}
auto dataBuffer = std::unique_ptr<char[]>(new char[length+4]);
*((int32_t*)dataBuffer.get()) = length; // set start of buffer to length
memcpy(dataBuffer.get() + 4, data, length); // copy the rest of the data
// lock our array and signal to other thread data is ready
{
std::unique_lock<std::mutex> lock(m_mutex);
m_dataBuffers.emplace_back(std::move(dataBuffer));
m_hasData = true; // must set data ready in case of spurious wake up
m_cv.notify_one(); // tell locked thread it can wake up
}
return true;
}
bool TCPSendAsync::Connected()
{
return m_socket != 0;
}
void TCPSendAsync::SendThread()
{
while (true) {
std::unique_ptr<char[]> sendData;
{
std::unique_lock<std::mutex> lock(m_mutex);
m_cv.wait(lock, [this] { return m_hasData.load(); });
if (m_dataBuffers.empty()) {
return; // if we have woken up with no data assume we need to exit the thread
}
auto front = m_dataBuffers.begin();
sendData = std::move(*front);
m_dataBuffers.erase(front);
m_hasData = false; // potentially we could still have data in pipe, we'll set this at the bottom
// unlock mutex now so we don't block whilst sending
}
if (sendData == nullptr) {
break; // shouldn't be able to get here
}
// get send size (which is packed at the start of the data
auto sendSize = *((int32_t*)sendData.get()) + 4; // send size doesn't include 'header'
int sent = SDLNet_TCP_Send(m_socket, sendData.get(), sendSize); // pack the length at the start of transmission.
if (sent < sendSize) {
SDLNet_TCP_Close(m_socket);
m_socket = nullptr;
break;
}
// we have finished with this buffer so release the data
sendData = nullptr;
//check if we still have data in the pipe, if so set ready state again
{
std::unique_lock<std::mutex> lock(m_mutex);
if (m_dataBuffers.size()) {
m_hasData = true;
m_cv.notify_one();
}
}
}
}
bool TCPSendAsync::Connect()
{
IPaddress ip;
int result = SDLNet_ResolveHost(&ip, m_ip.c_str(), m_port);
if (result == 0) {
m_socket = SDLNet_TCP_Open(&ip);
}
return Connected();
}

View file

@ -0,0 +1,61 @@
/**
** Supermodel
** A Sega Model 3 Arcade Emulator.
** Copyright 2011-2020 Bart Trzynadlowski, Nik Henson, Ian Curtis,
** Harry Tuttle, and Spindizzi
**
** This file is part of Supermodel.
**
** Supermodel is free software: you can redistribute it and/or modify it under
** the terms of the GNU General Public License as published by the Free
** Software Foundation, either version 3 of the License, or (at your option)
** any later version.
**
** Supermodel is distributed in the hope that it will be useful, but WITHOUT
** ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
** FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
** more details.
**
** You should have received a copy of the GNU General Public License along
** with Supermodel. If not, see <http://www.gnu.org/licenses/>.
**/
#ifndef _TCPSENDASYNC_H_
#define _TCPSENDASYNC_H_
#include <string>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <memory>
#include <thread>
#include "SDLIncludes.h"
class TCPSendAsync
{
public:
TCPSendAsync(std::string& ip, int port);
~TCPSendAsync();
bool Send(const void* data, int length);
bool Connect();
bool Connected();
private:
void SendThread();
std::string m_ip;
int m_port;
TCPsocket m_socket; // sdl socket
std::atomic<bool> m_hasData;
std::condition_variable m_cv;
std::vector<char> m_buffer;
std::mutex m_mutex;
std::thread m_sendThread;
std::vector<std::unique_ptr<char[]>> m_dataBuffers; // each pointer is to an array of data. First word is the size of the data
};
#endif