From 4b9221ecd69f48d794a9733951357aaf7bd15d42 Mon Sep 17 00:00:00 2001 From: Ian Curtis Date: Mon, 15 Mar 2021 15:55:39 +0000 Subject: [PATCH] Add some async network methods. Async send copies to an internal buffer and sends in another thread so doesn't block at all. Added CheckDataAvailable method to the TCPReceive class. Can check or wait for data without blocking. --- Src/Network/NetBoard.cpp | 6 +- Src/Network/NetBoard.h | 1 + Src/Network/TCPReceive.cpp | 29 ++++++- Src/Network/TCPReceive.h | 2 + Src/Network/TCPSendAsync.cpp | 164 +++++++++++++++++++++++++++++++++++ Src/Network/TCPSendAsync.h | 61 +++++++++++++ 6 files changed, 258 insertions(+), 5 deletions(-) create mode 100644 Src/Network/TCPSendAsync.cpp create mode 100644 Src/Network/TCPSendAsync.h diff --git a/Src/Network/NetBoard.cpp b/Src/Network/NetBoard.cpp index 0d62bee..d2740fb 100644 --- a/Src/Network/NetBoard.cpp +++ b/Src/Network/NetBoard.cpp @@ -98,8 +98,6 @@ #include "NetBoard.h" #include "Util/Format.h" #include "Util/ByteSwap.h" -#include "TCPSend.h" -#include "TCPReceive.h" #include // few macros to make debugging a bit less painful @@ -1236,9 +1234,9 @@ bool CNetBoard::Init(UINT8 * netRAMPtr, UINT8 *netBufferPtr) if (m_config["EmulateNet"].ValueAs() && m_attached) { while (!nets->Connect()) { - DebugLog("Connecting to %s:%i ..\n", addr_out.c_str(), port_out); + printf("Connecting to %s:%i ..\n", addr_out.c_str(), port_out); } - DebugLog("Successfully connected.\n"); + printf("Successfully connected.\n"); } return OKAY; diff --git a/Src/Network/NetBoard.h b/Src/Network/NetBoard.h index 6aa60fa..bb2cd52 100644 --- a/Src/Network/NetBoard.h +++ b/Src/Network/NetBoard.h @@ -28,6 +28,7 @@ #include "OSD/Thread.h" #include #include "TCPSend.h" +#include "TCPSendAsync.h" #include "TCPReceive.h" //#define NET_BUF_SIZE 32800 // 16384 not enough diff --git a/Src/Network/TCPReceive.cpp b/Src/Network/TCPReceive.cpp index 7b26e22..02dc87e 100644 --- a/Src/Network/TCPReceive.cpp +++ b/Src/Network/TCPReceive.cpp @@ -35,10 +35,13 @@ using namespace std::chrono_literals; TCPReceive::TCPReceive(int port) : m_listenSocket(nullptr), - m_receiveSocket(nullptr) + m_receiveSocket(nullptr), + m_socketSet(nullptr) { SDLNet_Init(); + m_socketSet = SDLNet_AllocSocketSet(1); + IPaddress ip; int result = SDLNet_ResolveHost(&ip, nullptr, port); @@ -69,9 +72,23 @@ TCPReceive::~TCPReceive() m_receiveSocket = nullptr; } + if (m_socketSet) { + SDLNet_FreeSocketSet(m_socketSet); + m_socketSet = nullptr; + } + SDLNet_Quit(); } +bool TCPReceive::CheckDataAvailable(int timeoutMS) +{ + if (!m_receiveSocket) { + return false; + } + + return SDLNet_CheckSockets(m_socketSet, timeoutMS) > 0; +} + std::vector& TCPReceive::Receive() { if (!m_receiveSocket) { @@ -119,7 +136,17 @@ void TCPReceive::ListenFunc() auto socket = SDLNet_TCP_Accept(m_listenSocket); if (socket) { + + // remove old socket if required from socket set + if (m_receiveSocket) { + SDLNet_DelSocket(m_socketSet, (SDLNet_GenericSocket)m_receiveSocket.load()); + } + m_receiveSocket = socket; + + SDLNet_AddSocket(m_socketSet, (SDLNet_GenericSocket)socket); + + // add socket to socket set DPRINTF("Accepted connection.\n"); } diff --git a/Src/Network/TCPReceive.h b/Src/Network/TCPReceive.h index cd4c399..77e4370 100644 --- a/Src/Network/TCPReceive.h +++ b/Src/Network/TCPReceive.h @@ -34,6 +34,7 @@ public: TCPReceive(int port); ~TCPReceive(); + bool CheckDataAvailable(int timeoutMS = 0); // timeoutMS -1 = wait forever until data arrives, 0 = no waiting, 1+ wait time in milliseconds std::vector& Receive(); private: @@ -42,6 +43,7 @@ private: TCPsocket m_listenSocket; std::atomic m_receiveSocket; + SDLNet_SocketSet m_socketSet; std::thread m_listenThread; std::atomic_bool m_running; std::vector m_recBuffer; diff --git a/Src/Network/TCPSendAsync.cpp b/Src/Network/TCPSendAsync.cpp new file mode 100644 index 0000000..8253c3f --- /dev/null +++ b/Src/Network/TCPSendAsync.cpp @@ -0,0 +1,164 @@ +/** + ** Supermodel + ** A Sega Model 3 Arcade Emulator. + ** Copyright 2011-2020 Bart Trzynadlowski, Nik Henson, Ian Curtis, + ** Harry Tuttle, and Spindizzi + ** + ** This file is part of Supermodel. + ** + ** Supermodel is free software: you can redistribute it and/or modify it under + ** the terms of the GNU General Public License as published by the Free + ** Software Foundation, either version 3 of the License, or (at your option) + ** any later version. + ** + ** Supermodel is distributed in the hope that it will be useful, but WITHOUT + ** ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + ** FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + ** more details. + ** + ** You should have received a copy of the GNU General Public License along + ** with Supermodel. If not, see . + **/ + +#include "TCPSendAsync.h" +#include "OSD/Logger.h" +#include + +#if defined(_DEBUG) +#include +#define DPRINTF DebugLog +#else +#define DPRINTF(a, ...) +#endif + +static const int RETRY_COUNT = 10; // shrugs + +TCPSendAsync::TCPSendAsync(std::string& ip, int port) : + m_ip(ip), + m_port(port), + m_socket(nullptr), + m_hasData(false) +{ + SDLNet_Init(); + + m_sendThread = std::thread(&TCPSendAsync::SendThread, this); +} + +TCPSendAsync::~TCPSendAsync() +{ + if (m_socket) { + SDLNet_TCP_Close(m_socket); + m_socket = nullptr; + } + + { + std::unique_lock lock(m_mutex); + m_dataBuffers.clear(); + m_hasData = true; // must set data ready in case of spurious wake up + m_cv.notify_one(); // tell locked thread it can wake up + } + + if (m_sendThread.joinable()) { + m_sendThread.join(); + } + + SDLNet_Quit(); // unload lib (winsock dll for windows) +} + +bool TCPSendAsync::Send(const void * data, int length) +{ + // If we failed bail out + if (!Connected()) { + DPRINTF("Not connected\n"); + return false; + } + + DPRINTF("Sending %i bytes\n", length); + + if (!length) { + return true; // 0 sized packet will blow our connex + } + + auto dataBuffer = std::unique_ptr(new char[length+4]); + + *((int32_t*)dataBuffer.get()) = length; // set start of buffer to length + memcpy(dataBuffer.get() + 4, data, length); // copy the rest of the data + + // lock our array and signal to other thread data is ready + { + std::unique_lock lock(m_mutex); + m_dataBuffers.emplace_back(std::move(dataBuffer)); + + m_hasData = true; // must set data ready in case of spurious wake up + m_cv.notify_one(); // tell locked thread it can wake up + } + + return true; +} + +bool TCPSendAsync::Connected() +{ + return m_socket != 0; +} + +void TCPSendAsync::SendThread() +{ + while (true) { + + std::unique_ptr sendData; + + { + std::unique_lock lock(m_mutex); + m_cv.wait(lock, [this] { return m_hasData.load(); }); + + if (m_dataBuffers.empty()) { + return; // if we have woken up with no data assume we need to exit the thread + } + + auto front = m_dataBuffers.begin(); + sendData = std::move(*front); + m_dataBuffers.erase(front); + m_hasData = false; // potentially we could still have data in pipe, we'll set this at the bottom + + // unlock mutex now so we don't block whilst sending + } + + if (sendData == nullptr) { + break; // shouldn't be able to get here + } + + // get send size (which is packed at the start of the data + auto sendSize = *((int32_t*)sendData.get()) + 4; // send size doesn't include 'header' + + int sent = SDLNet_TCP_Send(m_socket, sendData.get(), sendSize); // pack the length at the start of transmission. + if (sent < sendSize) { + SDLNet_TCP_Close(m_socket); + m_socket = nullptr; + break; + } + + // we have finished with this buffer so release the data + sendData = nullptr; + + //check if we still have data in the pipe, if so set ready state again + { + std::unique_lock lock(m_mutex); + if (m_dataBuffers.size()) { + m_hasData = true; + m_cv.notify_one(); + } + } + } +} + +bool TCPSendAsync::Connect() +{ + IPaddress ip; + int result = SDLNet_ResolveHost(&ip, m_ip.c_str(), m_port); + + if (result == 0) { + m_socket = SDLNet_TCP_Open(&ip); + } + + return Connected(); +} diff --git a/Src/Network/TCPSendAsync.h b/Src/Network/TCPSendAsync.h new file mode 100644 index 0000000..b70169c --- /dev/null +++ b/Src/Network/TCPSendAsync.h @@ -0,0 +1,61 @@ +/** + ** Supermodel + ** A Sega Model 3 Arcade Emulator. + ** Copyright 2011-2020 Bart Trzynadlowski, Nik Henson, Ian Curtis, + ** Harry Tuttle, and Spindizzi + ** + ** This file is part of Supermodel. + ** + ** Supermodel is free software: you can redistribute it and/or modify it under + ** the terms of the GNU General Public License as published by the Free + ** Software Foundation, either version 3 of the License, or (at your option) + ** any later version. + ** + ** Supermodel is distributed in the hope that it will be useful, but WITHOUT + ** ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + ** FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + ** more details. + ** + ** You should have received a copy of the GNU General Public License along + ** with Supermodel. If not, see . + **/ + +#ifndef _TCPSENDASYNC_H_ +#define _TCPSENDASYNC_H_ + +#include +#include +#include +#include +#include +#include +#include +#include "SDLIncludes.h" + +class TCPSendAsync +{ +public: + TCPSendAsync(std::string& ip, int port); + ~TCPSendAsync(); + + bool Send(const void* data, int length); + bool Connect(); + bool Connected(); +private: + + void SendThread(); + + std::string m_ip; + int m_port; + TCPsocket m_socket; // sdl socket + std::atomic m_hasData; + std::condition_variable m_cv; + std::vector m_buffer; + std::mutex m_mutex; + std::thread m_sendThread; + + std::vector> m_dataBuffers; // each pointer is to an array of data. First word is the size of the data + +}; + +#endif