added packet queue as a new head protocol5

Wed, 27 Jan 2021 12:34:56 +0200

author
Teemu Piippo <teemu@hecknology.net>
date
Wed, 27 Jan 2021 12:34:56 +0200
branch
protocol5
changeset 172
0b0bc8045d28
parent 171
d0fba0d7ad03
child 177
131518f86af6

added packet queue as a new head

CMakeLists.txt file | annotate | diff | comparison | revisions
sources/network/packetqueue.cpp file | annotate | diff | comparison | revisions
sources/network/packetqueue.h file | annotate | diff | comparison | revisions
sources/network/rconsession.cpp file | annotate | diff | comparison | revisions
sources/network/rconsession.h file | annotate | diff | comparison | revisions
sources/network/udpsocket.cpp file | annotate | diff | comparison | revisions
--- a/CMakeLists.txt	Wed Jan 27 12:34:26 2021 +0200
+++ b/CMakeLists.txt	Wed Jan 27 12:34:56 2021 +0200
@@ -20,6 +20,7 @@
 	sources/version.cpp
 	sources/network/bytestream.cpp
 	sources/network/ipaddress.cpp
+	sources/network/packetqueue.cpp
 	sources/network/rconsession.cpp
 	sources/network/udpsocket.cpp
 )
@@ -35,6 +36,7 @@
 	sources/mystring.h
 	sources/network/bytestream.h
 	sources/network/ipaddress.h
+	sources/network/packetqueue.h
 	sources/network/rconsession.h
 	sources/network/udpsocket.h
 	sources/range.h
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sources/network/packetqueue.cpp	Wed Jan 27 12:34:56 2021 +0200
@@ -0,0 +1,154 @@
+/*
+	Copyright 2016 Teemu Piippo
+	All rights reserved.
+
+	Redistribution and use in source and binary forms, with or without
+	modification, are permitted provided that the following conditions
+	are met:
+
+	1. Redistributions of source code must retain the above copyright
+	   notice, this list of conditions and the following disclaimer.
+	2. Redistributions in binary form must reproduce the above copyright
+	   notice, this list of conditions and the following disclaimer in the
+	   documentation and/or other materials provided with the distribution.
+	3. Neither the name of the copyright holder nor the names of its
+	   contributors may be used to endorse or promote products derived from
+	   this software without specific prior written permission.
+
+	THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+	"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+	TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+	PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER
+	OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+	EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+	PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+	PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+	LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+	NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+	SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#include <algorithm>
+#include "packetqueue.h"
+BEGIN_ZFC_NAMESPACE
+
+/*!
+ * \brief Constructs an empty packet queue.
+ */
+PacketQueue::PacketQueue() :
+    m_currentSequenceNumber(0) {}
+
+/*!
+ * \brief Inserts the packet into the queue, unless the packet is the next packet to be processed.
+ * \param sequenceNumber Sequence number of the packet.
+ * \param data Payload of the packet.
+ * \return True, if the packet was stored, false if the packet should be processed immediately.
+ */
+bool PacketQueue::addPacket(unsigned int sequenceNumber, const ByteArray& data)
+{
+	// Check whether this packet is the one we're supposed to process next.
+	if (sequenceNumber != m_currentSequenceNumber)
+	{
+		// It is not, therefore store it for later.
+		m_queue[sequenceNumber] = data;
+		return true;
+	}
+	else
+	{
+		// It is, therefore the caller processes it, and we can advance to the next packet right away.
+		m_currentSequenceNumber = getNextSequenceNumber();
+		return false;
+	}
+}
+
+/*!
+ * \returns whether there are packets in queue that cannot be processed due to missing in-between packets. If true, the
+ * \returns caller should initiate packet recovery protocol.
+ */
+bool PacketQueue::isStuck() const
+{
+	return m_queue.size() > 0  and  m_queue.find(m_currentSequenceNumber) == m_queue.end();
+}
+
+/*!
+ * \returns whether or not there are packets awaiting processing.
+ */
+bool PacketQueue::hasPacketsToPop() const
+{
+	return m_queue.size() > 0  and  m_queue.find(m_currentSequenceNumber) != m_queue.end();
+}
+
+/*!
+ * \brief Retrieves the next packet to be processed, and removes it from the queue.
+ * \param packet Reference to a byte array to store the packet payload into.
+ * \returns whether the next packet was successfully popped from the queue, or not.
+ */
+bool PacketQueue::popNextPacket(ByteArray& packet)
+{
+	// Find the packet matching our current sequence number.
+	auto iterator = m_queue.find(m_currentSequenceNumber);
+
+	if (iterator != m_queue.end())
+	{
+		// We found the packet we were looking for. Pass it to the caller.
+		packet = iterator->second;
+		// Remove the packet from the queue.
+		m_queue.erase(iterator);
+		// We can now advance to the next packet.
+		m_currentSequenceNumber = getNextSequenceNumber();
+		return true;
+	}
+	else
+	{
+		// We did not find the next packet.
+		return false;
+	}
+}
+
+/*!
+ * \returns the sequence number for the next packet.
+ */
+int PacketQueue::getNextSequenceNumber() const
+{
+	return (m_currentSequenceNumber + 1) % 1024;
+}
+
+/*!
+ * \returns a list of packets that have to be requested from the server.
+ */
+std::set<int> PacketQueue::getLostPackets() const
+{
+	std::set<int> packetsNeeded;
+	std::set<int> packetsInQueue;
+
+	// Build the set of packet numbers we currently have.
+	for (auto pair : m_queue)
+		packetsInQueue.insert(pair.first);
+
+	// Build the set of packets we wish to process. To do this we need the smallest and largest numbers in
+	// packetsInQueue.
+	Range<int> packetRange(min(packetsInQueue), max(packetsInQueue));
+
+	for (int i : packetRange)
+		packetsNeeded.insert(i);
+
+	// The set of lost packets is now the set of packets we want, minus the packets we have.
+	std::set<int> packetsLost;
+	std::set_difference(packetsNeeded.begin(), packetsNeeded.end(),
+	                    packetsInQueue.begin(), packetsInQueue.end(),
+	                    std::inserter(packetsLost, packetsLost.begin()));
+	return packetsLost;
+}
+
+std::set<unsigned int> PacketQueue::getWaitingPackets() const
+{
+	std::set<unsigned int> packetsInQueue;
+
+	// Build the set of packet numbers we currently have.
+	for (auto pair : m_queue)
+		packetsInQueue.insert(pair.first);
+
+	return packetsInQueue;
+}
+
+END_ZFC_NAMESPACE
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sources/network/packetqueue.h	Wed Jan 27 12:34:56 2021 +0200
@@ -0,0 +1,62 @@
+/*
+	Copyright 2016 Teemu Piippo
+	All rights reserved.
+
+	Redistribution and use in source and binary forms, with or without
+	modification, are permitted provided that the following conditions
+	are met:
+
+	1. Redistributions of source code must retain the above copyright
+	   notice, this list of conditions and the following disclaimer.
+	2. Redistributions in binary form must reproduce the above copyright
+	   notice, this list of conditions and the following disclaimer in the
+	   documentation and/or other materials provided with the distribution.
+	3. Neither the name of the copyright holder nor the names of its
+	   contributors may be used to endorse or promote products derived from
+	   this software without specific prior written permission.
+
+	THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+	"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+	TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+	PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER
+	OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+	EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+	PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+	PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+	LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+	NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+	SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#pragma once
+#include <map>
+#include <set>
+#include "../main.h"
+#include "../list.h"
+BEGIN_ZFC_NAMESPACE
+
+/*!
+ * \brief The PacketQueue class stores packets awaiting processing.
+ *
+ * The UDP protocol makes no guarantees of packet delivery. Packets may wind up being dropped, or delivered in the wrong
+ * order. This class mitigates that issue by storing the packets into a queue, and popping packets in order.
+ */
+class PacketQueue
+{
+public:
+	PacketQueue();
+
+	bool addPacket(unsigned int sequenceNumber, const ByteArray &data);
+	std::set<unsigned int> getWaitingPackets() const;
+	std::set<int> getLostPackets() const;
+	int getNextSequenceNumber() const;
+	bool hasPacketsToPop() const;
+	bool isStuck() const;
+	bool popNextPacket(ByteArray& packet);
+
+private:
+	std::map<unsigned int, ByteArray> m_queue;
+	unsigned int m_currentSequenceNumber;
+};
+
+END_ZFC_NAMESPACE
--- a/sources/network/rconsession.cpp	Wed Jan 27 12:34:26 2021 +0200
+++ b/sources/network/rconsession.cpp	Wed Jan 27 12:34:56 2021 +0200
@@ -45,6 +45,7 @@
 	m_state(RCON_DISCONNECTED),
 	m_lastPing(0),
 	m_adminCount(0),
+    m_lastMissingPacketRequest(0),
 	m_interface(nullptr)
 {
 	if (not m_socket.set_blocking(false))
@@ -122,7 +123,58 @@
 	{
 		// Only process packets that originate from the game server.
 		if (datagram.address == m_address)
-			handlePacket(datagram.message);
+		{
+			// Parse and cut off the header.
+			PacketHeader header;
+			{
+				// Read the header, and find the sequence number
+				Bytestream stream(datagram.message);
+				header.header = stream.readLong();
+				header.sequenceNumber = (header.header != 0) ? stream.readLong() : -1;
+				datagram.message = datagram.message.splice(stream.position(), datagram.message.size());
+			}
+
+			// Try to store this packet into the queue. However, do not try to store packets without a sequence number.
+			bool stored = false;
+
+			if (header.sequenceNumber != -1)
+				stored = m_packetQueue.addPacket(header.sequenceNumber, datagram.message);
+
+			// If the packet was not stored, we are to just process it right away.
+			if (stored == false)
+				handlePacket(datagram.message);
+		}
+	}
+
+	// Check if we can now also process some packets from the queue.
+	if (m_packetQueue.hasPacketsToPop())
+	{
+		ByteArray message;
+		while (m_packetQueue.popNextPacket(message))
+			handlePacket(message);
+	}
+
+	// Check whether there are packets stuck in the queue. If this is the case, we have lost some packets and need to
+	// ask the game server to re-send them.
+	if (m_packetQueue.isStuck()  and  m_lastMissingPacketRequest + 1 < time(NULL))
+	{
+		m_interface->printWarning("Missing packets detected. Packets currently in queue:\n");
+
+		for (int packetNumber : m_packetQueue.getWaitingPackets())
+			m_interface->printWarning("- %d:\n", packetNumber);
+
+		m_lastMissingPacketRequest = time(NULL);
+		ByteArray message;
+		Bytestream stream(message);
+		stream.writeByte(CLRC_MISSINGPACKET);
+
+		for (int packetNumber : m_packetQueue.getLostPackets())
+		{
+			m_interface->printWarning("Requesting lost packet %d\n", packetNumber);
+			stream.writeLong(packetNumber);
+		}
+
+		send(message);
 	}
 }
 
--- a/sources/network/rconsession.h	Wed Jan 27 12:34:26 2021 +0200
+++ b/sources/network/rconsession.h	Wed Jan 27 12:34:56 2021 +0200
@@ -32,6 +32,7 @@
 #include "ipaddress.h"
 #include "udpsocket.h"
 #include "bytestream.h"
+#include "packetqueue.h"
 BEGIN_ZFC_NAMESPACE
 
 // -------------------------------------------------------------------------------------------------
@@ -72,6 +73,7 @@
 	CLRC_DISCONNECT,
 	CLRC_TABCOMPLETE,
 	CLRC_WATCHCVAR,
+	CLRC_MISSINGPACKET,
 };
 
 // -------------------------------------------------------------------------------------------------
@@ -127,6 +129,7 @@
 	RCONSessionState m_state;
 	IPAddress m_address;
 	UDPSocket m_socket;
+	PacketQueue m_packetQueue;
 	time_t m_lastPing;
 	String m_password;
 	String m_salt;
@@ -135,6 +138,7 @@
 	int m_adminCount;
 	String m_level;
 	String m_lastTabComplete;
+	time_t m_lastMissingPacketRequest;
 	class Interface* m_interface;
 };
 
--- a/sources/network/udpsocket.cpp	Wed Jan 27 12:34:26 2021 +0200
+++ b/sources/network/udpsocket.cpp	Wed Jan 27 12:34:56 2021 +0200
@@ -129,6 +129,12 @@
 		return false;
 	}
 
+	if (length < 4)
+	{
+		m_error = "The server sent a too short packet";
+		return false;
+	}
+
 	unsigned char decodedPacket[MAX_DATAGRAM_LENGTH];
 	int decodedLength = sizeof decodedPacket;
 	HUFFMAN_Decode (reinterpret_cast<unsigned char*> (HuffmanBuffer),

mercurial