Sat, 23 Jul 2016 12:28:52 +0300
Added the PacketQueue class to make use of the sequence numbering to process packets in the correct order, and the ZFC9000 end of packet recovery protocol.
--- a/CMakeLists.txt Sat Jul 23 12:28:07 2016 +0300 +++ b/CMakeLists.txt Sat Jul 23 12:28:52 2016 +0300 @@ -20,6 +20,7 @@ sources/version.cpp sources/network/bytestream.cpp sources/network/ipaddress.cpp + sources/network/packetqueue.cpp sources/network/rconsession.cpp sources/network/udpsocket.cpp ) @@ -35,6 +36,7 @@ sources/mystring.h sources/network/bytestream.h sources/network/ipaddress.h + sources/network/packetqueue.h sources/network/rconsession.h sources/network/udpsocket.h sources/range.h
--- a/sources/network/bytestream.cpp Sat Jul 23 12:28:07 2016 +0300 +++ b/sources/network/bytestream.cpp Sat Jul 23 12:28:52 2016 +0300 @@ -129,28 +129,15 @@ */ String Bytestream::readString() { - ByteArray::Iterator stringEndIterator; + String result; - // Where's the end of the string? - for (stringEndIterator = getCurrentIterator(); *stringEndIterator != '\0'; ++stringEndIterator) + for (char byte; (byte = readByte()) != '\0';) { - if (stringEndIterator == m_data.end()) - { - // Past the end of the buffer - throw IOError("unterminated or too long string in packet"); - } + if (result.length() < MAX_NETWORK_STRING) + result += byte; } - // Skip past the null terminator. - if (*stringEndIterator == '\0') - stringEndIterator += 1; - - // Build and return the string, and advance the position. - int stringStart = m_position; - unsigned int length = stringEndIterator - getCurrentIterator(); - length = min<int>(length, MAX_NETWORK_STRING); - m_position += length; - return String::fromBytes(m_data.splice(stringStart, stringStart + length)); + return result; } /*!
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sources/network/packetqueue.cpp Sat Jul 23 12:28:52 2016 +0300 @@ -0,0 +1,154 @@ +/* + Copyright 2016 Teemu Piippo + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + 3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER + OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#include <algorithm> +#include "packetqueue.h" +BEGIN_ZFC_NAMESPACE + +/*! + * \brief Constructs an empty packet queue. + */ +PacketQueue::PacketQueue() : + m_currentSequenceNumber(0) {} + +/*! + * \brief Inserts the packet into the queue, unless the packet is the next packet to be processed. + * \param sequenceNumber Sequence number of the packet. + * \param data Payload of the packet. + * \return True, if the packet was stored, false if the packet should be processed immediately. + */ +bool PacketQueue::addPacket(unsigned int sequenceNumber, const ByteArray& data) +{ + // Check whether this packet is the one we're supposed to process next. + if (sequenceNumber != m_currentSequenceNumber) + { + // It is not, therefore store it for later. + m_queue[sequenceNumber] = data; + return true; + } + else + { + // It is, therefore the caller processes it, and we can advance to the next packet right away. + m_currentSequenceNumber = getNextSequenceNumber(); + return false; + } +} + +/*! + * \returns whether there are packets in queue that cannot be processed due to missing in-between packets. If true, the + * \returns caller should initiate packet recovery protocol. + */ +bool PacketQueue::isStuck() const +{ + return m_queue.size() > 0 and m_queue.find(m_currentSequenceNumber) == m_queue.end(); +} + +/*! + * \returns whether or not there are packets awaiting processing. + */ +bool PacketQueue::hasPacketsToPop() const +{ + return m_queue.size() > 0 and m_queue.find(m_currentSequenceNumber) != m_queue.end(); +} + +/*! + * \brief Retrieves the next packet to be processed, and removes it from the queue. + * \param packet Reference to a byte array to store the packet payload into. + * \returns whether the next packet was successfully popped from the queue, or not. + */ +bool PacketQueue::popNextPacket(ByteArray& packet) +{ + // Find the packet matching our current sequence number. + auto iterator = m_queue.find(m_currentSequenceNumber); + + if (iterator != m_queue.end()) + { + // We found the packet we were looking for. Pass it to the caller. + packet = iterator->second; + // Remove the packet from the queue. + m_queue.erase(iterator); + // We can now advance to the next packet. + m_currentSequenceNumber = getNextSequenceNumber(); + return true; + } + else + { + // We did not find the next packet. + return false; + } +} + +/*! + * \returns the sequence number for the next packet. + */ +int PacketQueue::getNextSequenceNumber() const +{ + return (m_currentSequenceNumber + 1) % 1024; +} + +/*! + * \returns a list of packets that have to be requested from the server. + */ +std::set<int> PacketQueue::getLostPackets() const +{ + std::set<int> packetsNeeded; + std::set<int> packetsInQueue; + + // Build the set of packet numbers we currently have. + for (auto pair : m_queue) + packetsInQueue.insert(pair.first); + + // Build the set of packets we wish to process. To do this we need the smallest and largest numbers in + // packetsInQueue. + Range<int> packetRange(min(packetsInQueue), max(packetsInQueue)); + + for (int i : packetRange) + packetsNeeded.insert(i); + + // The set of lost packets is now the set of packets we want, minus the packets we have. + std::set<int> packetsLost; + std::set_difference(packetsNeeded.begin(), packetsNeeded.end(), + packetsInQueue.begin(), packetsInQueue.end(), + std::inserter(packetsLost, packetsLost.begin())); + return packetsLost; +} + +std::set<unsigned int> PacketQueue::getWaitingPackets() const +{ + std::set<unsigned int> packetsInQueue; + + // Build the set of packet numbers we currently have. + for (auto pair : m_queue) + packetsInQueue.insert(pair.first); + + return packetsInQueue; +} + +END_ZFC_NAMESPACE
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sources/network/packetqueue.h Sat Jul 23 12:28:52 2016 +0300 @@ -0,0 +1,62 @@ +/* + Copyright 2016 Teemu Piippo + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + 3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER + OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#pragma once +#include <map> +#include <set> +#include "../main.h" +#include "../list.h" +BEGIN_ZFC_NAMESPACE + +/*! + * \brief The PacketQueue class stores packets awaiting processing. + * + * The UDP protocol makes no guarantees of packet delivery. Packets may wind up being dropped, or delivered in the wrong + * order. This class mitigates that issue by storing the packets into a queue, and popping packets in order. + */ +class PacketQueue +{ +public: + PacketQueue(); + + bool addPacket(unsigned int sequenceNumber, const ByteArray &data); + std::set<unsigned int> getWaitingPackets() const; + std::set<int> getLostPackets() const; + int getNextSequenceNumber() const; + bool hasPacketsToPop() const; + bool isStuck() const; + bool popNextPacket(ByteArray& packet); + +private: + std::map<unsigned int, ByteArray> m_queue; + unsigned int m_currentSequenceNumber; +}; + +END_ZFC_NAMESPACE
--- a/sources/network/rconsession.cpp Sat Jul 23 12:28:07 2016 +0300 +++ b/sources/network/rconsession.cpp Sat Jul 23 12:28:52 2016 +0300 @@ -33,12 +33,19 @@ #include "../interface.h" BEGIN_ZFC_NAMESPACE +struct PacketHeader +{ + int32_t header; + int sequenceNumber; +}; + // ------------------------------------------------------------------------------------------------- // RCONSession::RCONSession() : m_state(RCON_DISCONNECTED), m_lastPing(0), m_adminCount(0), + m_lastMissingPacketRequest(0), m_interface(nullptr) { if (not m_socket.set_blocking(false)) @@ -111,25 +118,74 @@ } } + // Check for new packets in our socket for (Datagram datagram; m_socket.read(datagram);) - handlePacket(datagram); + { + // Packet came from the wrong address, ignore + if (datagram.address != m_address) + continue; + + // Parse and cut off the header. + PacketHeader header; + { + // Read the header, and find the sequence number + Bytestream stream(datagram.message); + header.header = stream.readLong(); + header.sequenceNumber = (header.header != 0) ? stream.readLong() : -1; + datagram.message = datagram.message.splice(stream.position(), datagram.message.size()); + } + + // Try to store this packet into the queue. However, do not try to store packets without a sequence number. + bool stored = false; + + if (header.sequenceNumber != -1) + stored = m_packetQueue.addPacket(header.sequenceNumber, datagram.message); + + // If the packet was not stored, we are to just process it right away. + if (stored == false) + handlePacket(datagram.message); + } + + // Check if we can now also process some packets from the queue. + if (m_packetQueue.hasPacketsToPop()) + { + ByteArray message; + while (m_packetQueue.popNextPacket(message)) + handlePacket(message); + } + + // Check whether there are packets stuck in the queue. If this is the case, we have lost some packets and need to + // ask the game server to re-send them. + if (m_packetQueue.isStuck() and m_lastMissingPacketRequest + 1 < time(NULL)) + { + m_interface->printWarning("Missing packets detected. Packets currently in queue:\n"); + + for (int packetNumber : m_packetQueue.getWaitingPackets()) + m_interface->printWarning("- %d:\n", packetNumber); + + m_lastMissingPacketRequest = time(NULL); + ByteArray message; + Bytestream stream(message); + stream.writeByte(CLRC_MISSINGPACKET); + + for (int packetNumber : m_packetQueue.getLostPackets()) + { + m_interface->printWarning("Requesting lost packet %d\n", packetNumber); + stream.writeLong(packetNumber); + } + + send(message); + } } // ------------------------------------------------------------------------------------------------- // -void RCONSession::handlePacket(Datagram& datagram) +void RCONSession::handlePacket(ByteArray& message) { - if (datagram.address != m_address) - return; - - Bytestream stream(datagram.message); + Bytestream stream(message); try { - int32_t header = stream.readLong(); - int32_t sequenceNumber = (header != 0) ? stream.readLong() : 0; - m_interface->print("Recieved packet with header 0x%x and sequence number #%d\n", header, sequenceNumber); - while (stream.bytesLeft() > 0) { int header = stream.readByte();
--- a/sources/network/rconsession.h Sat Jul 23 12:28:07 2016 +0300 +++ b/sources/network/rconsession.h Sat Jul 23 12:28:52 2016 +0300 @@ -32,6 +32,7 @@ #include "ipaddress.h" #include "udpsocket.h" #include "bytestream.h" +#include "packetqueue.h" BEGIN_ZFC_NAMESPACE // ------------------------------------------------------------------------------------------------- @@ -72,6 +73,7 @@ CLRC_DISCONNECT, CLRC_TABCOMPLETE, CLRC_WATCHCVAR, + CLRC_MISSINGPACKET, }; // ------------------------------------------------------------------------------------------------- @@ -109,7 +111,7 @@ const String& getLevel() const; UDPSocket* getSocket(); RCONSessionState getState() const; - void handlePacket(Datagram& datagram); + void handlePacket(ByteArray& message); bool isActive() const; void processServerUpdates(Bytestream& packet); void requestTabCompletion(const String& part); @@ -127,6 +129,7 @@ RCONSessionState m_state; IPAddress m_address; UDPSocket m_socket; + PacketQueue m_packetQueue; time_t m_lastPing; String m_password; String m_salt; @@ -135,6 +138,7 @@ int m_adminCount; String m_level; String m_lastTabComplete; + time_t m_lastMissingPacketRequest; class Interface* m_interface; };