diff --git a/DarkRift.Client/BichannelClientConnection.cs b/DarkRift.Client/BichannelClientConnection.cs
index 21be3ff..527d809 100644
--- a/DarkRift.Client/BichannelClientConnection.cs
+++ b/DarkRift.Client/BichannelClientConnection.cs
@@ -4,12 +4,14 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
+using DarkRift.Dispatching;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
+using System.Threading;
namespace DarkRift.Client
{
@@ -32,10 +34,15 @@ public sealed class BichannelClientConnection : NetworkClientConnection, IDispos
/// Whether Nagel's algorithm should be disabled or not.
///
public bool NoDelay {
- get => tcpSocket.NoDelay;
- set => tcpSocket.NoDelay = value;
+ get => tcp.Socket.NoDelay;
+ set => tcp.Socket.NoDelay = value;
}
+ ///
+ /// When true no background thread is used and polling must instead be conducted by user by calling PollSockets().
+ ///
+ public bool ExplicitPolling { get; set; }
+
///
public override IEnumerable RemoteEndPoints => new IPEndPoint[] { RemoteTcpEndPoint, RemoteUdpEndPoint };
@@ -45,17 +52,31 @@ public bool NoDelay {
///
/// Backing for .
///
- private ConnectionState connectionState;
+#pragma warning disable IDE1006 // Naming Styles
+ private ConnectionState connectionState
+#pragma warning restore IDE1006 // Naming Styles
+ {
+ set
+ {
+ lock (myLock)
+ {
+ lockedConnectionState = value;
+ }
+ }
+ get
+ {
+ lock (myLock)
+ {
+ return lockedConnectionState;
+ }
+ }
+ }
- ///
- /// The socket used in TCP communication.
- ///
- private readonly Socket tcpSocket;
+ private ConnectionState lockedConnectionState;
+ private readonly object myLock = new object();
- ///
- /// The socket used in UDP communication.
- ///
- private readonly Socket udpSocket;
+ private readonly SynchronousTcpSocket tcp;
+ private readonly SynchronousUdpSocket udp;
///
/// Creates a new bichannel client.
@@ -81,8 +102,10 @@ public BichannelClientConnection(IPAddress ipAddress, int tcpPort, int udpPort,
RemoteTcpEndPoint = new IPEndPoint(ipAddress, tcpPort);
RemoteUdpEndPoint = new IPEndPoint(ipAddress, udpPort);
- tcpSocket = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
- udpSocket = new Socket(tcpSocket.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
+ var tcpSocket = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
+ tcp = new SynchronousTcpSocket(tcpSocket, Disconnect, HandleMessageReceived);
+ var udpSocket = new Socket(tcpSocket.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
+ udp = new SynchronousUdpSocket(udpSocket, Disconnect, HandleMessageReceived);
NoDelay = noDelay;
}
@@ -103,9 +126,11 @@ public BichannelClientConnection(IPVersion ipVersion, IPAddress ipAddress, int p
RemoteTcpEndPoint = new IPEndPoint(ipAddress, port);
RemoteUdpEndPoint = new IPEndPoint(ipAddress, port);
- tcpSocket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp);
- udpSocket = new Socket(tcpSocket.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
-
+ var tcpSocket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp);
+ tcp = new SynchronousTcpSocket(tcpSocket, Disconnect, HandleMessageReceived);
+ var udpSocket = new Socket(tcpSocket.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
+ udp = new SynchronousUdpSocket(udpSocket, Disconnect, HandleMessageReceived);
+
NoDelay = noDelay;
}
@@ -119,7 +144,7 @@ public override void Connect()
//Connect TCP
try
{
- tcpSocket.Connect(RemoteTcpEndPoint);
+ tcp.Socket.Connect(RemoteTcpEndPoint);
}
catch (SocketException e)
{
@@ -129,8 +154,8 @@ public override void Connect()
try
{
//Bind UDP to a free port
- udpSocket.Bind(new IPEndPoint(((IPEndPoint)tcpSocket.LocalEndPoint).Address, 0));
- udpSocket.Connect(RemoteUdpEndPoint);
+ udp.Socket.Bind(new IPEndPoint(((IPEndPoint)tcp.Socket.LocalEndPoint).Address, 0));
+ udp.Socket.Connect(RemoteUdpEndPoint);
}
catch (SocketException e)
{
@@ -139,28 +164,28 @@ public override void Connect()
//Receive auth token from TCP
byte[] buffer = new byte[9];
- tcpSocket.ReceiveTimeout = 5000;
- int receivedTcp = tcpSocket.Receive(buffer);
- tcpSocket.ReceiveTimeout = 0; //Reset to infinite
+ tcp.Socket.ReceiveTimeout = 5000;
+ int receivedTcp = tcp.Socket.Receive(buffer);
+ tcp.Socket.ReceiveTimeout = 0; //Reset to infinite
if (receivedTcp != 9 || buffer[0] != 0)
{
- tcpSocket.Shutdown(SocketShutdown.Both);
+ tcp.Shutdown();
throw new DarkRiftConnectionException("Timeout waiting for auth token from server.", SocketError.ConnectionAborted);
}
//Transmit token back over UDP to server listening port
- udpSocket.Send(buffer);
+ udp.Socket.Send(buffer);
//Receive response from server to initiate the connection
buffer = new byte[1];
- udpSocket.ReceiveTimeout = 5000;
- int receivedUdp = udpSocket.Receive(buffer);
- udpSocket.ReceiveTimeout = 0; //Reset to infinite
+ udp.Socket.ReceiveTimeout = 5000;
+ int receivedUdp = udp.Socket.Receive(buffer);
+ udp.Socket.ReceiveTimeout = 0; //Reset to infinite
if (receivedUdp != 1 || buffer[0] != 0)
{
- tcpSocket.Shutdown(SocketShutdown.Both);
+ tcp.Shutdown();
throw new DarkRiftConnectionException("Timeout waiting for UDP acknowledgement from server.", SocketError.ConnectionAborted);
}
}
@@ -177,28 +202,17 @@ public override void Connect()
throw;
}
- //Setup the TCP socket to receive a header
- SocketAsyncEventArgs tcpArgs = ObjectCache.GetSocketAsyncEventArgs();
- tcpArgs.BufferList = null;
-
- SetupReceiveHeader(tcpArgs);
- bool headerCompletingAsync = tcpSocket.ReceiveAsync(tcpArgs);
- if (!headerCompletingAsync)
- AsyncReceiveHeaderCompleted(this, tcpArgs);
-
- //Start receiving UDP packets
- SocketAsyncEventArgs udpArgs = ObjectCache.GetSocketAsyncEventArgs();
- udpArgs.BufferList = null;
- udpArgs.SetBuffer(new byte[ushort.MaxValue], 0, ushort.MaxValue);
+ //tcp.Socket.Blocking = false;
+ //tcp.CheckAvailable = false;
- udpArgs.Completed += UdpReceiveCompleted;
-
- bool udpCompletingAsync = udpSocket.ReceiveAsync(udpArgs);
- if (!udpCompletingAsync)
- UdpReceiveCompleted(this, udpArgs);
+ tcp.ResetBuffers();
+ udp.ResetBuffers();
//Mark connected to allow sending
connectionState = ConnectionState.Connected;
+
+ if (!ExplicitPolling)
+ PollingThread.AddWork(PollSockets);
}
///
@@ -207,35 +221,7 @@ public override bool SendMessageReliable(MessageBuffer message)
if (connectionState == ConnectionState.Disconnected)
return false;
- byte[] header = new byte[4];
- BigEndianHelper.WriteBytes(header, 0, message.Count);
-
- SocketAsyncEventArgs args = ObjectCache.GetSocketAsyncEventArgs();
-
- args.SetBuffer(null, 0, 0);
- args.BufferList = new List>()
- {
- new ArraySegment(header),
- new ArraySegment(message.Buffer, message.Offset, message.Count)
- };
- args.UserToken = message;
-
- args.Completed += TcpSendCompleted;
-
- bool completingAsync;
- try
- {
- completingAsync = tcpSocket.SendAsync(args);
- }
- catch (Exception)
- {
- return false;
- }
-
- if (!completingAsync)
- TcpSendCompleted(this, args);
-
- return true;
+ return tcp.SendMessageReliable(message);
}
///
@@ -244,27 +230,7 @@ public override bool SendMessageUnreliable(MessageBuffer message)
if (connectionState == ConnectionState.Disconnected)
return false;
- SocketAsyncEventArgs args = ObjectCache.GetSocketAsyncEventArgs();
- args.BufferList = null;
- args.SetBuffer(message.Buffer, message.Offset, message.Count);
- args.UserToken = message;
-
- args.Completed += UdpSendCompleted;
-
- bool completingAsync;
- try
- {
- completingAsync = udpSocket.SendAsync(args);
- }
- catch (Exception)
- {
- return false;
- }
-
- if (!completingAsync)
- UdpSendCompleted(this, args);
-
- return true;
+ return udp.SendMessageUnreliable(message);
}
///
@@ -274,7 +240,10 @@ public override bool Disconnect()
return false;
connectionState = ConnectionState.Disconnected;
- tcpSocket.Shutdown(SocketShutdown.Both);
+
+ PollingThread.RemoveWork(PollSockets);
+
+ tcp.Shutdown();
return true;
}
@@ -291,389 +260,12 @@ public override IPEndPoint GetRemoteEndPoint(string name)
}
///
- /// Receives TCP header followed by a TCP body, looping the operation becomes asynchronous.
- ///
- /// The socket args to use during the operations.
- private void ReceiveHeaderAndBody(SocketAsyncEventArgs args)
- {
- while (true)
- {
- if (!WasHeaderReceiveSucessful(args))
- {
- HandleDisconnectionDuringHeaderReceive(args);
- return;
- }
-
- if (!IsHeaderReceiveComplete(args))
- {
- UpdateBufferPointers(args);
-
- try
- {
- bool headerContinueCompletingAsync = tcpSocket.ReceiveAsync(args);
- if (headerContinueCompletingAsync)
- return;
- }
- catch (ObjectDisposedException)
- {
- HandleDisconnectionDuringHeaderReceive(args);
- return;
- }
-
- continue;
- }
-
- int bodyLength = ProcessHeader(args);
-
- SetupReceiveBody(args, bodyLength);
- while (true)
- {
- try
- {
- bool bodyCompletingAsync = tcpSocket.ReceiveAsync(args);
- if (bodyCompletingAsync)
- return;
- }
- catch (ObjectDisposedException)
- {
- HandleDisconnectionDuringBodyReceive(args);
- return;
- }
-
- if (!WasBodyReceiveSucessful(args))
- {
- HandleDisconnectionDuringBodyReceive(args);
- return;
- }
-
- if (IsBodyReceiveComplete(args))
- break;
-
- UpdateBufferPointers(args);
- }
-
- MessageBuffer bodyBuffer = ProcessBody(args);
-
- // Start next receive before invoking events
- SetupReceiveHeader(args);
- bool headerCompletingAsync;
- try
- {
- headerCompletingAsync = tcpSocket.ReceiveAsync(args);
- }
- catch (ObjectDisposedException)
- {
- HandleDisconnectionDuringHeaderReceive(args);
- return;
- }
-
- ProcessMessage(bodyBuffer);
-
- if (headerCompletingAsync)
- return;
- }
- }
-
- ///
- /// Event handler for when a TCP header has been received.
- ///
- /// The invoking object.
- /// The socket args used during the operation.
- private void AsyncReceiveHeaderCompleted(object sender, SocketAsyncEventArgs args)
- {
- //We can move straight back into main loop
- ReceiveHeaderAndBody(args);
- }
-
- ///
- /// Event handler for when a TCP body has been received.
- ///
- /// The invoking object.
- /// The socket args used during the operation.
- private void AsyncReceiveBodyCompleted(object sender, SocketAsyncEventArgs args)
- {
- while (true)
- {
- if (!WasBodyReceiveSucessful(args))
- {
- HandleDisconnectionDuringBodyReceive(args);
- return;
- }
-
- if (IsBodyReceiveComplete(args))
- break;
-
- UpdateBufferPointers(args);
-
- try
- {
- bool bodyContinueCompletingAsync = tcpSocket.ReceiveAsync(args);
- if (bodyContinueCompletingAsync)
- return;
- }
- catch (ObjectDisposedException)
- {
- HandleDisconnectionDuringBodyReceive(args);
- return;
- }
- }
-
- MessageBuffer bodyBuffer = ProcessBody(args);
-
- // Start next receive before invoking events
- SetupReceiveHeader(args);
- bool headerCompletingAsync;
- try
- {
- headerCompletingAsync = tcpSocket.ReceiveAsync(args);
- }
- catch (ObjectDisposedException)
- {
- HandleDisconnectionDuringHeaderReceive(args);
- return;
- }
-
- ProcessMessage(bodyBuffer);
-
- if (headerCompletingAsync)
- return;
-
- //Now move back into main loop until no more data is present
- ReceiveHeaderAndBody(args);
- }
-
- ///
- /// Checks if a TCP header was received in its entirety.
- ///
- /// The socket args used during the operation.
- /// If the whole header has been received.
- private bool IsHeaderReceiveComplete(SocketAsyncEventArgs args)
- {
- MessageBuffer headerBuffer = (MessageBuffer)args.UserToken;
-
- return args.Offset + args.BytesTransferred - headerBuffer.Offset >= headerBuffer.Count;
- }
-
- ///
- /// Checks if a TCP body was received in its entirety.
- ///
- /// The socket args used during the operation.
- /// If the whole body has been received.
- private bool IsBodyReceiveComplete(SocketAsyncEventArgs args)
- {
- MessageBuffer bodyBuffer = (MessageBuffer)args.UserToken;
-
- return args.Offset + args.BytesTransferred - bodyBuffer.Offset >= bodyBuffer.Count;
- }
-
- ///
- /// Processes a TCP header received.
- ///
- /// The socket args used during the operation.
- /// The number of bytes in the body.
- private int ProcessHeader(SocketAsyncEventArgs args)
- {
- MessageBuffer headerBuffer = (MessageBuffer)args.UserToken;
-
- int bodyLength = BigEndianHelper.ReadInt32(headerBuffer.Buffer, headerBuffer.Offset);
-
- headerBuffer.Dispose();
-
- args.Completed -= AsyncReceiveHeaderCompleted;
-
- return bodyLength;
- }
-
- ///
- /// Processes a TCP body received.
- ///
- /// The socket args used during the operation.
- /// The buffer received.
- private MessageBuffer ProcessBody(SocketAsyncEventArgs args)
- {
- args.Completed -= AsyncReceiveBodyCompleted;
- return (MessageBuffer)args.UserToken;
- }
-
- ///
- /// Invokes message recevied events and cleans up.
- ///
- /// The TCP body received.
- private void ProcessMessage(MessageBuffer buffer)
- {
- HandleMessageReceived(buffer, SendMode.Reliable);
-
- buffer.Dispose();
- }
-
- ///
- /// Checks if a TCP header was received correctly.
- ///
- /// The socket args used during the operation.
- /// If the receive completed correctly.
- private bool WasHeaderReceiveSucessful(SocketAsyncEventArgs args)
- {
- return args.BytesTransferred != 0 && args.SocketError == SocketError.Success;
- }
-
- ///
- /// Checks if a TCP body was received correctly.
- ///
- /// The socket args used during the operation.
- /// If the receive completed correctly.
- private bool WasBodyReceiveSucessful(SocketAsyncEventArgs args)
- {
- return args.BytesTransferred != 0 && args.SocketError == SocketError.Success;
- }
-
- ///
- /// Handles a disconnection while receiving a TCP header.
- ///
- /// The socket args used during the operation.
- private void HandleDisconnectionDuringHeaderReceive(SocketAsyncEventArgs args)
- {
- Disconnect(args.SocketError);
-
- MessageBuffer buffer = (MessageBuffer)args.UserToken;
- buffer.Dispose();
-
- args.Completed -= AsyncReceiveHeaderCompleted;
- ObjectCache.ReturnSocketAsyncEventArgs(args);
- }
-
- ///
- /// Handles a disconnection while receiving a TCP body.
- ///
- /// The socket args used during the operation.
- private void HandleDisconnectionDuringBodyReceive(SocketAsyncEventArgs args)
- {
- Disconnect(args.SocketError);
-
- MessageBuffer buffer = (MessageBuffer)args.UserToken;
- buffer.Dispose();
-
- args.Completed -= AsyncReceiveBodyCompleted;
- ObjectCache.ReturnSocketAsyncEventArgs(args);
- }
-
- ///
- /// Setup a lsiten operation for a new TCP header.
- ///
- /// The socket args to use during the operation.
- private void SetupReceiveHeader(SocketAsyncEventArgs args)
- {
- MessageBuffer headerBuffer = MessageBuffer.Create(4);
-
- args.SetBuffer(headerBuffer.Buffer, headerBuffer.Offset, 4);
- args.UserToken = headerBuffer;
- args.Completed += AsyncReceiveHeaderCompleted;
- }
-
- ///
- /// Setup a listen operation for a new TCP body.
+ /// Explicitly performs a step of message polling.
///
- /// The socket args to use during the operation.
- /// The number of bytes in the body.
- private void SetupReceiveBody(SocketAsyncEventArgs args, int length)
+ public void PollSockets()
{
- MessageBuffer bodyBuffer = MessageBuffer.Create(length);
- bodyBuffer.Count = length;
-
- args.SetBuffer(bodyBuffer.Buffer, bodyBuffer.Offset, length);
- args.UserToken = bodyBuffer;
- args.Completed += AsyncReceiveBodyCompleted;
- }
-
- ///
- /// Updates the pointers on the buffer to continue a receive operation.
- ///
- /// The socket args to update.
- private void UpdateBufferPointers(SocketAsyncEventArgs args) {
- args.SetBuffer(args.Offset + args.BytesTransferred, args.Count - args.BytesTransferred);
- }
-
- ///
- /// Called when a UDP message arrives.
- ///
- ///
- ///
- private void UdpReceiveCompleted(object sender, SocketAsyncEventArgs e)
- {
- bool completingAsync;
- do
- {
- //If we received a Success then process it
- if (e.SocketError == SocketError.Success)
- {
- using (MessageBuffer buffer = MessageBuffer.Create(e.BytesTransferred))
- {
- Buffer.BlockCopy(e.Buffer, 0, buffer.Buffer, buffer.Offset, e.BytesTransferred);
- buffer.Count = e.BytesTransferred;
-
- completingAsync = udpSocket.ReceiveAsync(e);
-
- //Length of 0 must be a hole punching packet
- if (buffer.Count != 0)
- HandleMessageReceived(buffer, SendMode.Unreliable);
- }
- }
-
- //Ignore ConnectionReset (ICMP Port Unreachable) since NATs will return that when they get
- //the punchthrough packets and they've not already been opened
- else if (e.SocketError == SocketError.ConnectionReset)
- {
- completingAsync = udpSocket.ReceiveAsync(e);
- }
-
- //Anything else is probably bad news
- else
- {
- Disconnect(e.SocketError);
-
- e.Completed -= UdpReceiveCompleted;
- ObjectCache.ReturnSocketAsyncEventArgs(e);
-
- // Leave the loop
- return;
- }
-
- } while (!completingAsync);
- }
-
- ///
- /// Called when a TCP send has completed.
- ///
- ///
- ///
- private void TcpSendCompleted(object sender, SocketAsyncEventArgs e)
- {
- if (e.SocketError != SocketError.Success)
- Disconnect(e.SocketError);
-
- e.Completed -= TcpSendCompleted;
-
- //Always dispose buffer when completed!
- ((MessageBuffer)e.UserToken).Dispose();
-
- ObjectCache.ReturnSocketAsyncEventArgs(e);
- }
-
- ///
- /// Called when a UDP send has completed.
- ///
- ///
- ///
- private void UdpSendCompleted(object sender, SocketAsyncEventArgs e)
- {
- if (e.SocketError != SocketError.Success)
- Disconnect(e.SocketError);
-
- e.Completed -= UdpSendCompleted;
-
- //Always dispose buffer when completed!
- ((MessageBuffer)e.UserToken).Dispose();
-
- ObjectCache.ReturnSocketAsyncEventArgs(e);
+ tcp.PollReceiveHeaderAndBodyNonBlocking();
+ udp.PollReceiveBodyNonBlocking();
}
///
@@ -686,6 +278,8 @@ private void Disconnect(SocketError error)
{
connectionState = ConnectionState.Disconnected;
+ PollingThread.RemoveWork(PollSockets);
+
HandleDisconnection(error);
}
}
@@ -707,8 +301,13 @@ protected override void Dispose(bool disposing)
{
Disconnect();
- tcpSocket.Close();
- udpSocket.Close();
+ PollingThread.RemoveWork(PollSockets);
+
+ tcp.Socket.Close();
+ udp.Socket.Close();
+
+ tcp.Dispose();
+ udp.Dispose();
}
disposedValue = true;
diff --git a/DarkRift.Server/Client.cs b/DarkRift.Server/Client.cs
index 087b822..dbf9c08 100644
--- a/DarkRift.Server/Client.cs
+++ b/DarkRift.Server/Client.cs
@@ -14,6 +14,7 @@
using System.Diagnostics;
using DarkRift.DataStructures;
using DarkRift.Server.Metrics;
+using DarkRift.Client;
namespace DarkRift.Server
{
diff --git a/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelListener.cs b/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelListener.cs
index 352f25a..27ef559 100644
--- a/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelListener.cs
+++ b/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelListener.cs
@@ -159,6 +159,7 @@ private struct UdpSendOperation
internal override bool SendUdpBuffer(EndPoint remoteEndPoint, MessageBuffer message, Action completed)
{
SocketAsyncEventArgs args = ObjectCache.GetSocketAsyncEventArgs();
+ args.SocketError = SocketError.Success;
args.BufferList = null;
args.UserToken = new UdpSendOperation { callback = completed, message = message };
args.SetBuffer(message.Buffer, message.Offset, message.Count);
@@ -168,7 +169,8 @@ internal override bool SendUdpBuffer(EndPoint remoteEndPoint, MessageBuffer mess
bool completingAsync;
try
{
- completingAsync = UdpListener.SendToAsync(args);
+ UdpListener.SendTo(message.Buffer, message.Offset, message.Count, SocketFlags.None, remoteEndPoint);
+ completingAsync = false;
}
catch (Exception e)
{
diff --git a/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelServerConnection.cs b/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelServerConnection.cs
index f77e8bf..81129c9 100644
--- a/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelServerConnection.cs
+++ b/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelServerConnection.cs
@@ -4,9 +4,11 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
+using DarkRift.Dispatching;
using DarkRift.Server.Metrics;
using System;
using System.Collections.Generic;
+using System.Data;
using System.Linq;
using System.Net;
using System.Net.Sockets;
@@ -22,12 +24,26 @@ internal sealed class BichannelServerConnection : NetworkServerConnection
///
/// Is this client able to send or not.
///
- public bool CanSend { get; private set; }
+ public bool CanSend
+ {
+ get
+ {
+ lock (myLock)
+ return lockedCanSend;
+ }
+ }
///
/// Is this client currently listening for messages or not.
///
- public bool IsListening { get; private set; }
+ public bool IsListening
+ {
+ get
+ {
+ lock (myLock)
+ return lockedIsListening;
+ }
+ }
///
/// The end point of the remote client on TCP.
@@ -43,8 +59,8 @@ internal sealed class BichannelServerConnection : NetworkServerConnection
/// Whether Nagel's algorithm should be disabled or not.
///
public bool NoDelay {
- get => tcpSocket.NoDelay;
- set => tcpSocket.NoDelay = value;
+ get => tcp.Socket.NoDelay;
+ set => tcp.Socket.NoDelay = value;
}
///
@@ -58,11 +74,6 @@ public bool NoDelay {
///
public override IEnumerable RemoteEndPoints => new IPEndPoint[2] { RemoteTcpEndPoint, RemoteUdpEndPoint };
- ///
- /// The socket used in TCP communication.
- ///
- private readonly Socket tcpSocket;
-
///
/// The listener used in UDP communication.
///
@@ -88,16 +99,22 @@ public bool NoDelay {
///
private readonly ICounterMetric bytesReceivedCounterUdp;
+ private bool lockedIsListening;
+ private bool lockedCanSend;
+ private readonly object myLock = new object();
+
+ private readonly SynchronousTcpSocket tcp;
+
internal BichannelServerConnection(Socket tcpSocket, BichannelListenerBase networkListener, IPEndPoint udpEndPoint, long authToken, MetricsCollector metricsCollector)
{
- this.tcpSocket = tcpSocket;
+ this.tcp = new SynchronousTcpSocket(tcpSocket, UnregisterAndDisconnect, HandleTcpMessage);
this.networkListener = networkListener;
this.RemoteTcpEndPoint = (IPEndPoint)tcpSocket.RemoteEndPoint;
this.RemoteUdpEndPoint = udpEndPoint;
this.AuthToken = authToken;
//Mark connected to allow sending
- CanSend = true;
+ lockedCanSend = true;
TaggedMetricBuilder bytesSentCounter = metricsCollector.Counter("bytes_sent", "The number of bytes sent to clients by the listener.", "protocol");
TaggedMetricBuilder bytesReceivedCounter = metricsCollector.Counter("bytes_received", "The number of bytes received from clients by the listener.", "protocol");
@@ -105,6 +122,9 @@ internal BichannelServerConnection(Socket tcpSocket, BichannelListenerBase netwo
bytesSentCounterUdp = bytesSentCounter.WithTags("udp");
bytesReceivedCounterTcp = bytesReceivedCounter.WithTags("tcp");
bytesReceivedCounterUdp = bytesReceivedCounter.WithTags("udp");
+
+ tcp.CheckBodyLength = CheckTcpBodyLength;
+ tcp.OnSendCompleted = TcpSendCompleted;
}
///
@@ -112,21 +132,19 @@ internal BichannelServerConnection(Socket tcpSocket, BichannelListenerBase netwo
///
public override void StartListening()
{
- //Setup the TCP socket to receive a header
- SocketAsyncEventArgs tcpArgs = ObjectCache.GetSocketAsyncEventArgs();
- tcpArgs.BufferList = null;
+ //tcp.Socket.Blocking = false;
+ //tcp.CheckAvailable = false;
- SetupReceiveHeader(tcpArgs);
- // TODO can throw an object disposed exception here if we connect and disconnect very quickly
- bool headerCompletingAsync = tcpSocket.ReceiveAsync(tcpArgs);
- if (!headerCompletingAsync)
- AsyncReceiveHeaderCompleted(this, tcpArgs);
+ tcp.ResetBuffers();
//Register for UDP Messages
networkListener.RegisterUdpConnection(this);
//Mark as listening
- IsListening = true;
+ lock (myLock)
+ lockedIsListening = true;
+
+ PollingThread.AddWork(DoPolling);
}
///
@@ -135,36 +153,7 @@ public override bool SendMessageReliable(MessageBuffer message)
if (!CanSend)
return false;
- byte[] header = new byte[4]; // TODO pool!
- BigEndianHelper.WriteBytes(header, 0, message.Count);
-
- SocketAsyncEventArgs args = ObjectCache.GetSocketAsyncEventArgs();
-
- args.SetBuffer(null, 0, 0);
- args.BufferList = new List>() // TODO pooollllllll!
- {
- new ArraySegment(header),
- new ArraySegment(message.Buffer, message.Offset, message.Count)
- };
- args.UserToken = message;
-
- args.Completed += TcpSendCompleted;
-
-
- bool completingAsync;
- try
- {
- completingAsync = tcpSocket.SendAsync(args);
- }
- catch (Exception)
- {
- return false;
- }
-
- if (!completingAsync)
- TcpSendCompleted(this, args);
-
- return true;
+ return tcp.SendMessageReliable(message);
}
///
@@ -182,12 +171,17 @@ public override bool SendMessageUnreliable(MessageBuffer message)
/// Whether the disconnect was successful.
public override bool Disconnect()
{
- if (!CanSend && !IsListening)
- return false;
+ lock (myLock)
+ {
+ if (!lockedCanSend && !lockedIsListening)
+ return false;
+ }
+
+ PollingThread.RemoveWork(DoPolling);
try
{
- tcpSocket.Shutdown(SocketShutdown.Both);
+ tcp.Shutdown();
}
catch (SocketException)
{
@@ -196,333 +190,20 @@ public override bool Disconnect()
networkListener.UnregisterUdpConnection(this);
- CanSend = false;
- IsListening = false;
-
- return true;
- }
-
- ///
- /// Receives TCP header followed by a TCP body, looping until the operation becomes asynchronous.
- ///
- /// The socket args to use during the operations.
- private void ReceiveHeaderAndBody(SocketAsyncEventArgs args)
- {
- while (true)
- {
- if (!WasHeaderReceiveSucessful(args))
- {
- HandleDisconnectionDuringHeaderReceive(args);
- return;
- }
-
- if (!IsHeaderReceiveComplete(args))
- {
- UpdateBufferPointers(args);
-
- try
- {
- bool headerContinueCompletingAsync = tcpSocket.ReceiveAsync(args);
- if (headerContinueCompletingAsync)
- return;
- }
- catch (ObjectDisposedException)
- {
- HandleDisconnectionDuringHeaderReceive(args);
- return;
- }
-
- continue;
- }
-
- int bodyLength = ProcessHeader(args);
- if (bodyLength >= networkListener.MaxTcpBodyLength)
- {
- Strike("TCP body length was above allowed limits.", 10);
- return;
- }
-
- SetupReceiveBody(args, bodyLength);
- while (true)
- {
- try
- {
- bool bodyCompletingAsync = tcpSocket.ReceiveAsync(args);
- if (bodyCompletingAsync)
- return;
- }
- catch (ObjectDisposedException)
- {
- HandleDisconnectionDuringBodyReceive(args);
- return;
- }
-
- if (!WasBodyReceiveSucessful(args))
- {
- HandleDisconnectionDuringBodyReceive(args);
- return;
- }
-
- if (IsBodyReceiveComplete(args))
- break;
-
- UpdateBufferPointers(args);
- }
-
- MessageBuffer bodyBuffer = ProcessBody(args);
-
- // Start next receive before invoking events
- SetupReceiveHeader(args);
- bool headerCompletingAsync;
- try
- {
- headerCompletingAsync = tcpSocket.ReceiveAsync(args);
- }
- catch (ObjectDisposedException)
- {
- HandleDisconnectionDuringHeaderReceive(args);
- return;
- }
-
- ProcessMessage(bodyBuffer);
-
- if (headerCompletingAsync)
- return;
- }
- }
-
- ///
- /// Event handler for when a TCP header has been received.
- ///
- /// The invoking object.
- /// The socket args used during the operation.
- private void AsyncReceiveHeaderCompleted(object sender, SocketAsyncEventArgs args)
- {
- //We can move straight back into main loop
- ReceiveHeaderAndBody(args);
- }
-
- ///
- /// Event handler for when a TCP body has been received.
- ///
- /// The invoking object.
- /// The socket args used during the operation.
- private void AsyncReceiveBodyCompleted(object sender, SocketAsyncEventArgs args)
- {
- while (true)
+ lock (myLock)
{
- if (!WasBodyReceiveSucessful(args))
- {
- HandleDisconnectionDuringBodyReceive(args);
- return;
- }
-
- if (IsBodyReceiveComplete(args))
- break;
-
- UpdateBufferPointers(args);
-
- try
- {
- bool bodyContinueCompletingAsync = tcpSocket.ReceiveAsync(args);
- if (bodyContinueCompletingAsync)
- return;
- }
- catch (ObjectDisposedException)
- {
- HandleDisconnectionDuringBodyReceive(args);
- return;
- }
+ lockedCanSend = false;
+ lockedIsListening = false;
}
- MessageBuffer bodyBuffer = ProcessBody(args);
-
- // Start next receive before invoking events
- SetupReceiveHeader(args);
- bool headerCompletingAsync;
- try
- {
- headerCompletingAsync = tcpSocket.ReceiveAsync(args);
- }
- catch (ObjectDisposedException)
- {
- HandleDisconnectionDuringHeaderReceive(args);
- return;
- }
-
- ProcessMessage(bodyBuffer);
-
- if (headerCompletingAsync)
- return;
-
- //Now move back into main loop until no more data is present
- ReceiveHeaderAndBody(args);
- }
-
- ///
- /// Checks if a TCP header was received in its entirety.
- ///
- /// The socket args used during the operation.
- /// If the whole header has been received.
- private bool IsHeaderReceiveComplete(SocketAsyncEventArgs args)
- {
- MessageBuffer headerBuffer = (MessageBuffer)args.UserToken;
-
- return args.Offset + args.BytesTransferred - headerBuffer.Offset >= headerBuffer.Count;
- }
-
- ///
- /// Checks if a TCP body was received in its entirety.
- ///
- /// The socket args used during the operation.
- /// If the whole body has been received.
- private bool IsBodyReceiveComplete(SocketAsyncEventArgs args)
- {
- MessageBuffer bodyBuffer = (MessageBuffer)args.UserToken;
-
- return args.Offset + args.BytesTransferred - bodyBuffer.Offset >= bodyBuffer.Count;
- }
-
- ///
- /// Processes a TCP header received.
- ///
- /// The socket args used during the operation.
- /// The number of bytes in the body.
- private int ProcessHeader(SocketAsyncEventArgs args)
- {
- MessageBuffer headerBuffer = (MessageBuffer)args.UserToken;
-
- int bodyLength = BigEndianHelper.ReadInt32(headerBuffer.Buffer, headerBuffer.Offset);
-
- headerBuffer.Dispose();
-
- args.Completed -= AsyncReceiveHeaderCompleted;
-
- return bodyLength;
- }
-
- ///
- /// Processes a TCP body received.
- ///
- /// The socket args used during the operation.
- /// The buffer received.
- private MessageBuffer ProcessBody(SocketAsyncEventArgs args)
- {
- args.Completed -= AsyncReceiveBodyCompleted;
-
- return (MessageBuffer)args.UserToken;
- }
-
- ///
- /// Invokes message recevied events and cleans up.
- ///
- /// The TCP body received.
- private void ProcessMessage(MessageBuffer buffer)
- {
- HandleMessageReceived(buffer, SendMode.Reliable);
-
- int bytesReceived = buffer.Count;
- buffer.Dispose();
-
- bytesReceivedCounterTcp.Increment(bytesReceived + 4);
- }
-
- ///
- /// Checks if a TCP header was received correctly.
- ///
- /// The socket args used during the operation.
- /// If the receive completed correctly.
- private bool WasHeaderReceiveSucessful(SocketAsyncEventArgs args)
- {
- return args.BytesTransferred != 0 && args.SocketError == SocketError.Success;
- }
-
- ///
- /// Checks if a TCP body was received correctly.
- ///
- /// The socket args used during the operation.
- /// If the receive completed correctly.
- private bool WasBodyReceiveSucessful(SocketAsyncEventArgs args)
- {
- return args.BytesTransferred != 0 && args.SocketError == SocketError.Success;
- }
-
- ///
- /// Handles a disconnection while receiving a TCP header.
- ///
- /// The socket args used during the operation.
- private void HandleDisconnectionDuringHeaderReceive(SocketAsyncEventArgs args)
- {
- try
- {
- UnregisterAndDisconnect(args.SocketError);
- }
- finally
- {
- MessageBuffer buffer = (MessageBuffer)args.UserToken;
- buffer.Dispose();
-
- args.Completed -= AsyncReceiveHeaderCompleted;
- ObjectCache.ReturnSocketAsyncEventArgs(args);
- }
- }
-
- ///
- /// Handles a disconnection while receiving a TCP body.
- ///
- /// The socket args used during the operation.
- private void HandleDisconnectionDuringBodyReceive(SocketAsyncEventArgs args)
- {
- try
- {
- UnregisterAndDisconnect(args.SocketError);
- }
- finally
- {
- MessageBuffer buffer = (MessageBuffer)args.UserToken;
- buffer.Dispose();
-
- args.Completed -= AsyncReceiveBodyCompleted;
- ObjectCache.ReturnSocketAsyncEventArgs(args);
- }
- }
-
- ///
- /// Setup a listen operation for a new TCP header.
- ///
- /// The socket args to use during the operation.
- private void SetupReceiveHeader(SocketAsyncEventArgs args)
- {
- MessageBuffer headerBuffer = MessageBuffer.Create(4);
- headerBuffer.Count = 4;
-
- args.SetBuffer(headerBuffer.Buffer, headerBuffer.Offset, 4);
- args.UserToken = headerBuffer;
- args.Completed += AsyncReceiveHeaderCompleted;
+ return true;
}
- ///
- /// Setup a listen operation for a new TCP body.
- ///
- /// The socket args to use during the operation.
- /// The number of bytes in the body.
- private void SetupReceiveBody(SocketAsyncEventArgs args, int length)
+ private void HandleTcpMessage(MessageBuffer buffer, SendMode sendMode)
{
- MessageBuffer bodyBuffer = MessageBuffer.Create(length);
- bodyBuffer.Count = length;
+ this.HandleMessageReceived(buffer, sendMode);
- args.SetBuffer(bodyBuffer.Buffer, bodyBuffer.Offset, length);
- args.UserToken = bodyBuffer;
- args.Completed += AsyncReceiveBodyCompleted;
- }
-
- ///
- /// Updates the pointers on the buffer to continue a receive operation.
- ///
- /// The socket args to update.
- private void UpdateBufferPointers(SocketAsyncEventArgs args)
- {
- args.SetBuffer(args.Offset + args.BytesTransferred, args.Count - args.BytesTransferred);
+ bytesReceivedCounterTcp.Increment(buffer.Count + 4);
}
///
@@ -535,29 +216,6 @@ internal void HandleUdpMessage(MessageBuffer buffer)
bytesReceivedCounterUdp.Increment(buffer.Count);
}
- ///
- /// Called when a TCP send has completed.
- ///
- ///
- ///
- private void TcpSendCompleted(object sender, SocketAsyncEventArgs e)
- {
- if (e.SocketError != SocketError.Success)
- UnregisterAndDisconnect(e.SocketError);
-
- e.Completed -= TcpSendCompleted;
-
- MessageBuffer messageBuffer = (MessageBuffer)e.UserToken;
- int bytesSent = messageBuffer.Count;
-
- //Always dispose buffer when completed!
- messageBuffer.Dispose();
-
- ObjectCache.ReturnSocketAsyncEventArgs(e);
-
- bytesSentCounterTcp.Increment(bytesSent + 4);
- }
-
///
/// Called when a UDP send has completed.
///
@@ -571,18 +229,43 @@ private void UdpSendCompleted(int bytesSent, SocketError e)
bytesSentCounterUdp.Increment(bytesSent);
}
+ private void TcpSendCompleted(MessageBuffer messageBuffer)
+ {
+ bytesSentCounterTcp.Increment(messageBuffer.Count + 4);
+ }
+
+ private bool CheckTcpBodyLength(int bodyLength)
+ {
+ if (bodyLength >= networkListener.MaxTcpBodyLength)
+ {
+ Strike("TCP body length was above allowed limits.", 10);
+ return false;
+ }
+
+ return true;
+ }
+
///
/// Called when a socket error has occured.
///
///
private void UnregisterAndDisconnect(SocketError error)
{
- if (CanSend || IsListening)
+ bool canUnregister = false;
+ lock (myLock)
+ {
+ canUnregister = lockedCanSend || lockedIsListening;
+ }
+
+ if (canUnregister)
{
networkListener.UnregisterUdpConnection(this);
- CanSend = false;
- IsListening = false;
+ lock (myLock)
+ {
+ lockedCanSend = false;
+ lockedIsListening = false;
+ }
HandleDisconnection(error);
}
@@ -599,6 +282,14 @@ public override IPEndPoint GetRemoteEndPoint(string name)
throw new ArgumentException("Endpoint name must either be TCP or UDP");
}
+ ///
+ /// Explicitly performs a step of message polling.
+ ///
+ public void DoPolling()
+ {
+ tcp.PollReceiveHeaderAndBodyNonBlocking();
+ }
+
#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls
@@ -608,10 +299,18 @@ protected override void Dispose(bool disposing)
{
if (disposing)
{
- if (IsListening || CanSend)
+ bool shouldDisconnect = false;
+ lock (myLock)
+ {
+ shouldDisconnect = lockedIsListening || lockedCanSend;
+ }
+
+ if (shouldDisconnect)
Disconnect();
- tcpSocket.Close();
+ PollingThread.RemoveWork(DoPolling);
+
+ tcp.Dispose();
}
disposedValue = true;
diff --git a/DarkRift/Dispatching/PollingThread.cs b/DarkRift/Dispatching/PollingThread.cs
new file mode 100644
index 0000000..c62c45b
--- /dev/null
+++ b/DarkRift/Dispatching/PollingThread.cs
@@ -0,0 +1,108 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace DarkRift.Dispatching
+{
+ internal static class PollingThread
+ {
+ public static Action ExceptionHandler { get; set; }
+
+ private static readonly object myLock = new object();
+ private static bool threadStarted;
+ private static readonly List workList = new List();
+
+ private static readonly ManualResetEvent stopEvent = new ManualResetEvent(false);
+
+ public static void AddWork(Action work)
+ {
+ lock (myLock)
+ {
+ workList.Add(work);
+
+ if (!threadStarted)
+ {
+ StartThread();
+ }
+ }
+ }
+
+ public static void RemoveWork(Action work)
+ {
+ //this blocking on a polling iteration is intended
+ lock (myLock)
+ {
+ InternalRemoveWork(work);
+ }
+ }
+
+ private static void InternalRemoveWork(Action work)
+ {
+ workList.Remove(work);
+ }
+
+ public static void StopThread()
+ {
+ stopEvent.Set();
+
+ //No support to restart currently since it is presently regarded superfluous (see threadStarted).
+ }
+
+ private static void StartThread()
+ {
+ stopEvent.Reset();
+ threadStarted = true;
+
+ var thread = new Thread(PollingThreadLogic);
+ thread.IsBackground = true;
+ thread.Name = "DarkRift 2 Polling Thread";
+ thread.Start();
+ }
+
+ private static void PollingThreadLogic()
+ {
+ var rng = new Random();
+
+ while (!stopEvent.WaitOne(1))
+ {
+ lock (myLock)
+ {
+ PollingIteration(rng);
+ }
+ }
+ }
+
+ private static void PollingIteration(Random rng)
+ {
+ var work = workList;
+ rng.Shuffle(work);
+
+ for (int i = work.Count - 1; i >= 0; --i)
+ {
+ var item = work[i];
+
+ try
+ {
+ item?.Invoke();
+ }
+ catch (Exception ex)
+ {
+ InternalRemoveWork(item);
+ ExceptionHandler?.Invoke(ex);
+ }
+ }
+ }
+
+ private static void Shuffle(this Random rng, IList array)
+ {
+ int n = array.Count;
+ while (n > 1)
+ {
+ int k = rng.Next(n--);
+ T temp = array[n];
+ array[n] = array[k];
+ array[k] = temp;
+ }
+ }
+ }
+}
diff --git a/DarkRift/SynchronousTcpSocket.cs b/DarkRift/SynchronousTcpSocket.cs
new file mode 100644
index 0000000..41bd224
--- /dev/null
+++ b/DarkRift/SynchronousTcpSocket.cs
@@ -0,0 +1,323 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net.Sockets;
+using System.Text;
+
+namespace DarkRift
+{
+ internal class SynchronousTcpSocket : IDisposable
+ {
+ public Socket Socket { get; private set; }
+
+ private readonly Action disconnect;
+ private readonly Action handleMessage;
+ public Func CheckBodyLength { get; set; }
+ public Action OnSendCompleted { get; set; }
+
+ private SocketAsyncEventArgs tcpArgs;
+ private TcpReceiveState tcpReceiveState;
+ private int tcpBytesTransferred;
+ private bool disposedValue;
+
+ private enum TcpReceiveState
+ {
+ ReceiveHeader,
+ ReceiveBody,
+ }
+
+ public SynchronousTcpSocket(Socket socket, Action disconnect, Action handleMessage)
+ {
+ Socket = socket;
+ this.disconnect = disconnect;
+ this.handleMessage = handleMessage;
+
+ //TODO: be able to call ResetBuffers()
+ }
+
+ public void ResetBuffers()
+ {
+ //Setup the TCP socket to receive a header
+ tcpArgs = ObjectCache.GetSocketAsyncEventArgs();
+ tcpArgs.BufferList = null;
+
+ SetupReceiveHeader(tcpArgs);
+ }
+
+ public void Shutdown()
+ {
+ Socket.Shutdown(SocketShutdown.Both);
+ }
+
+ public bool SendMessageReliable(MessageBuffer message)
+ {
+ byte[] header = new byte[4];
+ BigEndianHelper.WriteBytes(header, 0, message.Count);
+
+ SocketAsyncEventArgs args = ObjectCache.GetSocketAsyncEventArgs();
+ args.SocketError = SocketError.Success;
+
+ args.SetBuffer(null, 0, 0);
+ args.BufferList = new List>()
+ {
+ new ArraySegment(header),
+ new ArraySegment(message.Buffer, message.Offset, message.Count)
+ };
+ args.UserToken = message;
+
+ try
+ {
+ Socket.Send(args.BufferList);
+ }
+ catch (SocketException ex)
+ {
+ args.SocketError = ex.SocketErrorCode;
+ }
+ catch (Exception)
+ {
+ return false;
+ }
+
+ SendCompleted(args);
+
+ return true;
+ }
+
+ ///
+ /// Receives TCP header followed by a TCP body. The operation
+ /// may exit early in an incomplete state.
+ ///
+ public void PollReceiveHeaderAndBodyNonBlocking()
+ {
+ var args = tcpArgs;
+
+ while (true)
+ {
+ if (tcpReceiveState == TcpReceiveState.ReceiveHeader)
+ {
+ if (!PollReceiveTcpNonBlocking(args))
+ return;
+
+ int bodyLength = ProcessHeader(args);
+ if (CheckBodyLength != null && !CheckBodyLength(bodyLength))
+ {
+ return;
+ }
+ SetupReceiveBody(args, bodyLength);
+ }
+
+ if (tcpReceiveState == TcpReceiveState.ReceiveBody)
+ {
+ if (!PollReceiveTcpNonBlocking(args))
+ return;
+
+ try
+ {
+ MessageBuffer bodyBuffer = ProcessBody(args);
+ ProcessMessage(bodyBuffer);
+ }
+ finally
+ {
+ SetupReceiveHeader(tcpArgs);
+ }
+ }
+ }
+ }
+
+ private bool IsTcpReceiveComplete(SocketAsyncEventArgs args)
+ {
+ if (tcpBytesTransferred == 0)
+ return false;
+
+ MessageBuffer buffer = (MessageBuffer)args.UserToken;
+
+ return args.Offset + tcpBytesTransferred - buffer.Offset >= buffer.Count;
+ }
+
+ public bool CheckAvailable { get; set; } = true;
+
+ private bool PollReceiveTcpNonBlocking(SocketAsyncEventArgs args)
+ {
+ while (!IsTcpReceiveComplete(args))
+ {
+ UpdateBufferPointers(args);
+
+ int bytesAvailable = Socket.Available;
+
+ if (CheckAvailable && bytesAvailable == 0)
+ return false;
+
+ try
+ {
+ args.SocketError = SocketError.Success;
+ tcpBytesTransferred = Socket.Receive(args.Buffer, args.Offset, Math.Min(bytesAvailable, args.Count), SocketFlags.None);
+ }
+ catch (ObjectDisposedException)
+ {
+ HandleDisconnectionDuringTcpReceive(args);
+ return false;
+ }
+ catch (SocketException ex)
+ {
+ if (ex.SocketErrorCode == SocketError.WouldBlock)
+ return false;
+
+ args.SocketError = ex.SocketErrorCode;
+ HandleDisconnectionDuringTcpReceive(args);
+ return false;
+ }
+
+ if (tcpBytesTransferred == 0)
+ {
+ HandleDisconnectionDuringTcpReceive(args);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ ///
+ /// Processes a TCP header received.
+ ///
+ /// The socket args used during the operation.
+ /// The number of bytes in the body.
+ private int ProcessHeader(SocketAsyncEventArgs args)
+ {
+ MessageBuffer headerBuffer = (MessageBuffer)args.UserToken;
+
+ int bodyLength = BigEndianHelper.ReadInt32(headerBuffer.Buffer, headerBuffer.Offset);
+
+ headerBuffer.Dispose();
+
+ return bodyLength;
+ }
+
+ ///
+ /// Processes a TCP body received.
+ ///
+ /// The socket args used during the operation.
+ /// The buffer received.
+ private MessageBuffer ProcessBody(SocketAsyncEventArgs args)
+ {
+ return (MessageBuffer)args.UserToken;
+ }
+
+ ///
+ /// Invokes message recevied events and cleans up.
+ ///
+ /// The TCP body received.
+ private void ProcessMessage(MessageBuffer buffer)
+ {
+ handleMessage(buffer, SendMode.Reliable);
+
+ buffer.Dispose();
+ }
+
+ ///
+ /// Handles a disconnection while receiving a TCP header.
+ ///
+ /// The socket args used during the operation.
+ private void HandleDisconnectionDuringTcpReceive(SocketAsyncEventArgs args)
+ {
+ disconnect(args.SocketError);
+ }
+
+ ///
+ /// Setup a listen operation for a new TCP header.
+ ///
+ /// The socket args to use during the operation.
+ private void SetupReceiveHeader(SocketAsyncEventArgs args)
+ {
+ tcpBytesTransferred = 0;
+ tcpReceiveState = TcpReceiveState.ReceiveHeader;
+
+ MessageBuffer headerBuffer = MessageBuffer.Create(4);
+
+ args.SetBuffer(headerBuffer.Buffer, headerBuffer.Offset, 4);
+ args.UserToken = headerBuffer;
+ }
+
+ ///
+ /// Setup a listen operation for a new TCP body.
+ ///
+ /// The socket args to use during the operation.
+ /// The number of bytes in the body.
+ private void SetupReceiveBody(SocketAsyncEventArgs args, int length)
+ {
+ tcpBytesTransferred = 0;
+ tcpReceiveState = TcpReceiveState.ReceiveBody;
+
+ MessageBuffer bodyBuffer = MessageBuffer.Create(length);
+ bodyBuffer.Count = length;
+
+ args.SetBuffer(bodyBuffer.Buffer, bodyBuffer.Offset, length);
+ args.UserToken = bodyBuffer;
+ }
+
+ ///
+ /// Updates the pointers on the buffer to continue a receive operation.
+ ///
+ /// The socket args to update.
+ private void UpdateBufferPointers(SocketAsyncEventArgs args)
+ {
+ args.SetBuffer(args.Offset + tcpBytesTransferred, args.Count - tcpBytesTransferred);
+ }
+
+ ///
+ /// Called when a TCP or UDP send has completed.
+ ///
+ ///
+ private void SendCompleted(SocketAsyncEventArgs e)
+ {
+ if (e.SocketError != SocketError.Success)
+ disconnect(e.SocketError);
+
+ MessageBuffer messageBuffer = (MessageBuffer)e.UserToken;
+ OnSendCompleted?.Invoke(messageBuffer);
+
+ //Always dispose buffer when completed!
+ messageBuffer.Dispose();
+
+ ObjectCache.ReturnSocketAsyncEventArgs(e);
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!disposedValue)
+ {
+ if (disposing)
+ {
+ Socket.Close();
+
+ if (tcpArgs != null)
+ {
+ MessageBuffer buffer = (MessageBuffer)tcpArgs.UserToken;
+ buffer.Dispose();
+
+ ObjectCache.ReturnSocketAsyncEventArgs(tcpArgs);
+ tcpArgs = null;
+ }
+ }
+
+ // TODO: free unmanaged resources (unmanaged objects) and override finalizer
+ // TODO: set large fields to null
+ disposedValue = true;
+ }
+ }
+
+ // // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources
+ // ~SynchronousTcpSocket()
+ // {
+ // // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
+ // Dispose(disposing: false);
+ // }
+
+ public void Dispose()
+ {
+ // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
+ Dispose(disposing: true);
+ GC.SuppressFinalize(this);
+ }
+ }
+}
diff --git a/DarkRift/SynchronousUdpSocket.cs b/DarkRift/SynchronousUdpSocket.cs
new file mode 100644
index 0000000..33cb440
--- /dev/null
+++ b/DarkRift/SynchronousUdpSocket.cs
@@ -0,0 +1,152 @@
+using System;
+using System.Collections.Generic;
+using System.Data;
+using System.Linq;
+using System.Net.Sockets;
+using System.Text;
+
+namespace DarkRift
+{
+ internal class SynchronousUdpSocket : IDisposable
+ {
+ public Socket Socket { get; private set; }
+
+ private readonly Action disconnect;
+ private readonly Action handleMessage;
+
+ private SocketAsyncEventArgs udpArgs;
+ private bool disposedValue;
+
+ public SynchronousUdpSocket(Socket socket, Action disconnect, Action handleMessage)
+ {
+ Socket = socket;
+ this.disconnect = disconnect;
+ this.handleMessage = handleMessage;
+
+ //TODO: be able to call ResetBuffers()
+ }
+
+ public void ResetBuffers()
+ {
+ udpArgs = ObjectCache.GetSocketAsyncEventArgs();
+ udpArgs.BufferList = null;
+ udpArgs.SetBuffer(new byte[ushort.MaxValue], 0, ushort.MaxValue);
+ }
+
+ public bool SendMessageUnreliable(MessageBuffer message)
+ {
+ SocketAsyncEventArgs args = ObjectCache.GetSocketAsyncEventArgs();
+ args.SocketError = SocketError.Success;
+ args.BufferList = null;
+ args.SetBuffer(message.Buffer, message.Offset, message.Count);
+ args.UserToken = message;
+
+ try
+ {
+ Socket.Send(message.Buffer, message.Offset, message.Count, SocketFlags.None);
+ }
+ catch (Exception)
+ {
+ return false;
+ }
+
+ SendCompleted(args);
+
+ return true;
+ }
+
+ ///
+ /// Called when a UDP message arrives.
+ ///
+ public void PollReceiveBodyNonBlocking()
+ {
+ var args = udpArgs;
+
+ while (true)
+ {
+ int bytesAvailable = Socket.Available;
+ if (bytesAvailable == 0)
+ return;
+
+ int bytesTransferred;
+ try
+ {
+ args.SocketError = SocketError.Success;
+ bytesTransferred = Socket.Receive(args.Buffer, ushort.MaxValue, SocketFlags.None);
+ }
+ catch (SocketException ex)
+ {
+ if (ex.SocketErrorCode == SocketError.ConnectionReset)
+ {
+ //Ignore ConnectionReset (ICMP Port Unreachable) since NATs will return that when they get
+ //the punchthrough packets and they've not already been opened
+ return;
+ }
+
+ args.SocketError = ex.SocketErrorCode;
+ disconnect(args.SocketError);
+ return;
+ }
+
+ using (MessageBuffer buffer = MessageBuffer.Create(bytesTransferred))
+ {
+ Buffer.BlockCopy(args.Buffer, 0, buffer.Buffer, buffer.Offset, bytesTransferred);
+ buffer.Count = bytesTransferred;
+
+ //Length of 0 must be a hole punching packet
+ if (buffer.Count != 0)
+ handleMessage(buffer, SendMode.Unreliable);
+ }
+ }
+ }
+
+ ///
+ /// Called when a TCP or UDP send has completed.
+ ///
+ ///
+ private void SendCompleted(SocketAsyncEventArgs e)
+ {
+ if (e.SocketError != SocketError.Success)
+ disconnect(e.SocketError);
+
+ //Always dispose buffer when completed!
+ ((MessageBuffer)e.UserToken).Dispose();
+
+ ObjectCache.ReturnSocketAsyncEventArgs(e);
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!disposedValue)
+ {
+ if (disposing)
+ {
+ if (udpArgs != null)
+ {
+ udpArgs.SetBuffer(null, 0, 0);
+ ObjectCache.ReturnSocketAsyncEventArgs(udpArgs);
+ udpArgs = null;
+ }
+ }
+
+ // TODO: free unmanaged resources (unmanaged objects) and override finalizer
+ // TODO: set large fields to null
+ disposedValue = true;
+ }
+ }
+
+ // // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources
+ // ~SynchronousUdpSocket()
+ // {
+ // // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
+ // Dispose(disposing: false);
+ // }
+
+ public void Dispose()
+ {
+ // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
+ Dispose(disposing: true);
+ GC.SuppressFinalize(this);
+ }
+ }
+}