Added the PacketQueue class to make use of the sequence numbering to process packets in the correct order, and the ZFC9000 end of packet recovery protocol. protocol5

Sat, 23 Jul 2016 12:28:52 +0300

author
Teemu Piippo <teemu@compsta2.com>
date
Sat, 23 Jul 2016 12:28:52 +0300
branch
protocol5
changeset 167
0150f86e68f0
parent 166
af5fa8c43ca8
child 169
febc3ed5435c

Added the PacketQueue class to make use of the sequence numbering to process packets in the correct order, and the ZFC9000 end of packet recovery protocol.

CMakeLists.txt file | annotate | diff | comparison | revisions
sources/network/bytestream.cpp file | annotate | diff | comparison | revisions
sources/network/packetqueue.cpp file | annotate | diff | comparison | revisions
sources/network/packetqueue.h file | annotate | diff | comparison | revisions
sources/network/rconsession.cpp file | annotate | diff | comparison | revisions
sources/network/rconsession.h file | annotate | diff | comparison | revisions
--- a/CMakeLists.txt	Sat Jul 23 12:28:07 2016 +0300
+++ b/CMakeLists.txt	Sat Jul 23 12:28:52 2016 +0300
@@ -20,6 +20,7 @@
 	sources/version.cpp
 	sources/network/bytestream.cpp
 	sources/network/ipaddress.cpp
+	sources/network/packetqueue.cpp
 	sources/network/rconsession.cpp
 	sources/network/udpsocket.cpp
 )
@@ -35,6 +36,7 @@
 	sources/mystring.h
 	sources/network/bytestream.h
 	sources/network/ipaddress.h
+	sources/network/packetqueue.h
 	sources/network/rconsession.h
 	sources/network/udpsocket.h
 	sources/range.h
--- a/sources/network/bytestream.cpp	Sat Jul 23 12:28:07 2016 +0300
+++ b/sources/network/bytestream.cpp	Sat Jul 23 12:28:52 2016 +0300
@@ -129,28 +129,15 @@
  */
 String Bytestream::readString()
 {
-	ByteArray::Iterator stringEndIterator;
+	String result;
 
-	// Where's the end of the string?
-	for (stringEndIterator = getCurrentIterator(); *stringEndIterator != '\0'; ++stringEndIterator)
+	for (char byte; (byte = readByte()) != '\0';)
 	{
-		if (stringEndIterator == m_data.end())
-		{
-			// Past the end of the buffer
-			throw IOError("unterminated or too long string in packet");
-		}
+		if (result.length() < MAX_NETWORK_STRING)
+			result += byte;
 	}
 
-	// Skip past the null terminator.
-	if (*stringEndIterator == '\0')
-		stringEndIterator += 1;
-
-	// Build and return the string, and advance the position.
-	int stringStart = m_position;
-	unsigned int length = stringEndIterator - getCurrentIterator();
-	length = min<int>(length, MAX_NETWORK_STRING);
-	m_position += length;
-	return String::fromBytes(m_data.splice(stringStart, stringStart + length));
+	return result;
 }
 
 /*!
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sources/network/packetqueue.cpp	Sat Jul 23 12:28:52 2016 +0300
@@ -0,0 +1,154 @@
+/*
+	Copyright 2016 Teemu Piippo
+	All rights reserved.
+
+	Redistribution and use in source and binary forms, with or without
+	modification, are permitted provided that the following conditions
+	are met:
+
+	1. Redistributions of source code must retain the above copyright
+	   notice, this list of conditions and the following disclaimer.
+	2. Redistributions in binary form must reproduce the above copyright
+	   notice, this list of conditions and the following disclaimer in the
+	   documentation and/or other materials provided with the distribution.
+	3. Neither the name of the copyright holder nor the names of its
+	   contributors may be used to endorse or promote products derived from
+	   this software without specific prior written permission.
+
+	THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+	"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+	TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+	PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER
+	OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+	EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+	PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+	PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+	LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+	NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+	SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#include <algorithm>
+#include "packetqueue.h"
+BEGIN_ZFC_NAMESPACE
+
+/*!
+ * \brief Constructs an empty packet queue.
+ */
+PacketQueue::PacketQueue() :
+    m_currentSequenceNumber(0) {}
+
+/*!
+ * \brief Inserts the packet into the queue, unless the packet is the next packet to be processed.
+ * \param sequenceNumber Sequence number of the packet.
+ * \param data Payload of the packet.
+ * \return True, if the packet was stored, false if the packet should be processed immediately.
+ */
+bool PacketQueue::addPacket(unsigned int sequenceNumber, const ByteArray& data)
+{
+	// Check whether this packet is the one we're supposed to process next.
+	if (sequenceNumber != m_currentSequenceNumber)
+	{
+		// It is not, therefore store it for later.
+		m_queue[sequenceNumber] = data;
+		return true;
+	}
+	else
+	{
+		// It is, therefore the caller processes it, and we can advance to the next packet right away.
+		m_currentSequenceNumber = getNextSequenceNumber();
+		return false;
+	}
+}
+
+/*!
+ * \returns whether there are packets in queue that cannot be processed due to missing in-between packets. If true, the
+ * \returns caller should initiate packet recovery protocol.
+ */
+bool PacketQueue::isStuck() const
+{
+	return m_queue.size() > 0  and  m_queue.find(m_currentSequenceNumber) == m_queue.end();
+}
+
+/*!
+ * \returns whether or not there are packets awaiting processing.
+ */
+bool PacketQueue::hasPacketsToPop() const
+{
+	return m_queue.size() > 0  and  m_queue.find(m_currentSequenceNumber) != m_queue.end();
+}
+
+/*!
+ * \brief Retrieves the next packet to be processed, and removes it from the queue.
+ * \param packet Reference to a byte array to store the packet payload into.
+ * \returns whether the next packet was successfully popped from the queue, or not.
+ */
+bool PacketQueue::popNextPacket(ByteArray& packet)
+{
+	// Find the packet matching our current sequence number.
+	auto iterator = m_queue.find(m_currentSequenceNumber);
+
+	if (iterator != m_queue.end())
+	{
+		// We found the packet we were looking for. Pass it to the caller.
+		packet = iterator->second;
+		// Remove the packet from the queue.
+		m_queue.erase(iterator);
+		// We can now advance to the next packet.
+		m_currentSequenceNumber = getNextSequenceNumber();
+		return true;
+	}
+	else
+	{
+		// We did not find the next packet.
+		return false;
+	}
+}
+
+/*!
+ * \returns the sequence number for the next packet.
+ */
+int PacketQueue::getNextSequenceNumber() const
+{
+	return (m_currentSequenceNumber + 1) % 1024;
+}
+
+/*!
+ * \returns a list of packets that have to be requested from the server.
+ */
+std::set<int> PacketQueue::getLostPackets() const
+{
+	std::set<int> packetsNeeded;
+	std::set<int> packetsInQueue;
+
+	// Build the set of packet numbers we currently have.
+	for (auto pair : m_queue)
+		packetsInQueue.insert(pair.first);
+
+	// Build the set of packets we wish to process. To do this we need the smallest and largest numbers in
+	// packetsInQueue.
+	Range<int> packetRange(min(packetsInQueue), max(packetsInQueue));
+
+	for (int i : packetRange)
+		packetsNeeded.insert(i);
+
+	// The set of lost packets is now the set of packets we want, minus the packets we have.
+	std::set<int> packetsLost;
+	std::set_difference(packetsNeeded.begin(), packetsNeeded.end(),
+	                    packetsInQueue.begin(), packetsInQueue.end(),
+	                    std::inserter(packetsLost, packetsLost.begin()));
+	return packetsLost;
+}
+
+std::set<unsigned int> PacketQueue::getWaitingPackets() const
+{
+	std::set<unsigned int> packetsInQueue;
+
+	// Build the set of packet numbers we currently have.
+	for (auto pair : m_queue)
+		packetsInQueue.insert(pair.first);
+
+	return packetsInQueue;
+}
+
+END_ZFC_NAMESPACE
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sources/network/packetqueue.h	Sat Jul 23 12:28:52 2016 +0300
@@ -0,0 +1,62 @@
+/*
+	Copyright 2016 Teemu Piippo
+	All rights reserved.
+
+	Redistribution and use in source and binary forms, with or without
+	modification, are permitted provided that the following conditions
+	are met:
+
+	1. Redistributions of source code must retain the above copyright
+	   notice, this list of conditions and the following disclaimer.
+	2. Redistributions in binary form must reproduce the above copyright
+	   notice, this list of conditions and the following disclaimer in the
+	   documentation and/or other materials provided with the distribution.
+	3. Neither the name of the copyright holder nor the names of its
+	   contributors may be used to endorse or promote products derived from
+	   this software without specific prior written permission.
+
+	THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+	"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+	TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+	PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER
+	OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+	EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+	PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+	PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+	LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+	NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+	SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#pragma once
+#include <map>
+#include <set>
+#include "../main.h"
+#include "../list.h"
+BEGIN_ZFC_NAMESPACE
+
+/*!
+ * \brief The PacketQueue class stores packets awaiting processing.
+ *
+ * The UDP protocol makes no guarantees of packet delivery. Packets may wind up being dropped, or delivered in the wrong
+ * order. This class mitigates that issue by storing the packets into a queue, and popping packets in order.
+ */
+class PacketQueue
+{
+public:
+	PacketQueue();
+
+	bool addPacket(unsigned int sequenceNumber, const ByteArray &data);
+	std::set<unsigned int> getWaitingPackets() const;
+	std::set<int> getLostPackets() const;
+	int getNextSequenceNumber() const;
+	bool hasPacketsToPop() const;
+	bool isStuck() const;
+	bool popNextPacket(ByteArray& packet);
+
+private:
+	std::map<unsigned int, ByteArray> m_queue;
+	unsigned int m_currentSequenceNumber;
+};
+
+END_ZFC_NAMESPACE
--- a/sources/network/rconsession.cpp	Sat Jul 23 12:28:07 2016 +0300
+++ b/sources/network/rconsession.cpp	Sat Jul 23 12:28:52 2016 +0300
@@ -33,12 +33,19 @@
 #include "../interface.h"
 BEGIN_ZFC_NAMESPACE
 
+struct PacketHeader
+{
+	int32_t header;
+	int sequenceNumber;
+};
+
 // -------------------------------------------------------------------------------------------------
 //
 RCONSession::RCONSession() :
 	m_state(RCON_DISCONNECTED),
 	m_lastPing(0),
 	m_adminCount(0),
+    m_lastMissingPacketRequest(0),
 	m_interface(nullptr)
 {
 	if (not m_socket.set_blocking(false))
@@ -111,25 +118,74 @@
 		}
 	}
 
+	// Check for new packets in our socket
 	for (Datagram datagram; m_socket.read(datagram);)
-		handlePacket(datagram);
+	{
+		// Packet came from the wrong address, ignore
+		if (datagram.address != m_address)
+			continue;
+
+		// Parse and cut off the header.
+		PacketHeader header;
+		{
+			// Read the header, and find the sequence number
+			Bytestream stream(datagram.message);
+			header.header = stream.readLong();
+			header.sequenceNumber = (header.header != 0) ? stream.readLong() : -1;
+			datagram.message = datagram.message.splice(stream.position(), datagram.message.size());
+		}
+
+		// Try to store this packet into the queue. However, do not try to store packets without a sequence number.
+		bool stored = false;
+
+		if (header.sequenceNumber != -1)
+			stored = m_packetQueue.addPacket(header.sequenceNumber, datagram.message);
+
+		// If the packet was not stored, we are to just process it right away.
+		if (stored == false)
+			handlePacket(datagram.message);
+	}
+
+	// Check if we can now also process some packets from the queue.
+	if (m_packetQueue.hasPacketsToPop())
+	{
+		ByteArray message;
+		while (m_packetQueue.popNextPacket(message))
+			handlePacket(message);
+	}
+
+	// Check whether there are packets stuck in the queue. If this is the case, we have lost some packets and need to
+	// ask the game server to re-send them.
+	if (m_packetQueue.isStuck()  and  m_lastMissingPacketRequest + 1 < time(NULL))
+	{
+		m_interface->printWarning("Missing packets detected. Packets currently in queue:\n");
+
+		for (int packetNumber : m_packetQueue.getWaitingPackets())
+			m_interface->printWarning("- %d:\n", packetNumber);
+
+		m_lastMissingPacketRequest = time(NULL);
+		ByteArray message;
+		Bytestream stream(message);
+		stream.writeByte(CLRC_MISSINGPACKET);
+
+		for (int packetNumber : m_packetQueue.getLostPackets())
+		{
+			m_interface->printWarning("Requesting lost packet %d\n", packetNumber);
+			stream.writeLong(packetNumber);
+		}
+
+		send(message);
+	}
 }
 
 // -------------------------------------------------------------------------------------------------
 //
-void RCONSession::handlePacket(Datagram& datagram)
+void RCONSession::handlePacket(ByteArray& message)
 {
-	if (datagram.address != m_address)
-		return;
-
-	Bytestream stream(datagram.message);
+	Bytestream stream(message);
 
 	try
 	{
-		int32_t header = stream.readLong();
-		int32_t sequenceNumber = (header != 0) ? stream.readLong() : 0;
-		m_interface->print("Recieved packet with header 0x%x and sequence number #%d\n", header, sequenceNumber);
-
 		while (stream.bytesLeft() > 0)
 		{
 			int header = stream.readByte();
--- a/sources/network/rconsession.h	Sat Jul 23 12:28:07 2016 +0300
+++ b/sources/network/rconsession.h	Sat Jul 23 12:28:52 2016 +0300
@@ -32,6 +32,7 @@
 #include "ipaddress.h"
 #include "udpsocket.h"
 #include "bytestream.h"
+#include "packetqueue.h"
 BEGIN_ZFC_NAMESPACE
 
 // -------------------------------------------------------------------------------------------------
@@ -72,6 +73,7 @@
 	CLRC_DISCONNECT,
 	CLRC_TABCOMPLETE,
 	CLRC_WATCHCVAR,
+	CLRC_MISSINGPACKET,
 };
 
 // -------------------------------------------------------------------------------------------------
@@ -109,7 +111,7 @@
 	const String&               getLevel() const;
 	UDPSocket*                  getSocket();
 	RCONSessionState            getState() const;
-	void                        handlePacket(Datagram& datagram);
+	void                        handlePacket(ByteArray& message);
 	bool                        isActive() const;
 	void                        processServerUpdates(Bytestream& packet);
 	void                        requestTabCompletion(const String& part);
@@ -127,6 +129,7 @@
 	RCONSessionState m_state;
 	IPAddress m_address;
 	UDPSocket m_socket;
+	PacketQueue m_packetQueue;
 	time_t m_lastPing;
 	String m_password;
 	String m_salt;
@@ -135,6 +138,7 @@
 	int m_adminCount;
 	String m_level;
 	String m_lastTabComplete;
+	time_t m_lastMissingPacketRequest;
 	class Interface* m_interface;
 };
 

mercurial