From f34598ef71ec74f821b6642f7e2df4b359fd548a Mon Sep 17 00:00:00 2001 From: "petter.hansson" Date: Mon, 24 Oct 2022 07:07:49 +0200 Subject: [PATCH 1/7] Synchronous sends and receives for Unity (WIP) --- DarkRift.Client/BichannelClientConnection.cs | 341 +++++++++--------- DarkRift.Server/Client.cs | 1 + .../Listeners/Bichannel/BichannelListener.cs | 4 +- .../Bichannel/BichannelServerConnection.cs | 4 +- 4 files changed, 169 insertions(+), 181 deletions(-) diff --git a/DarkRift.Client/BichannelClientConnection.cs b/DarkRift.Client/BichannelClientConnection.cs index 21be3ff..75b0970 100644 --- a/DarkRift.Client/BichannelClientConnection.cs +++ b/DarkRift.Client/BichannelClientConnection.cs @@ -10,6 +10,7 @@ using System.Net; using System.Net.Sockets; using System.Text; +using System.Threading; namespace DarkRift.Client { @@ -45,7 +46,28 @@ public bool NoDelay { /// /// Backing for . /// - private ConnectionState connectionState; +#pragma warning disable IDE1006 // Naming Styles + private ConnectionState connectionState +#pragma warning restore IDE1006 // Naming Styles + { + set + { + lock (myLock) + { + lockedConnectionState = value; + } + } + get + { + lock (myLock) + { + return lockedConnectionState; + } + } + } + + private ConnectionState lockedConnectionState; + private readonly object myLock = new object(); /// /// The socket used in TCP communication. @@ -57,6 +79,18 @@ public bool NoDelay { /// private readonly Socket udpSocket; + private SocketAsyncEventArgs tcpArgs; + private TcpReceiveState tcpReceiveState; + private int tcpBytesTransferred; + private readonly ManualResetEvent stopPollingSignal = new ManualResetEvent(false); + private readonly ManualResetEvent stoppedPollingSignal = new ManualResetEvent(false); + + private enum TcpReceiveState + { + ReceiveHeader, + ReceiveBody, + } + /// /// Creates a new bichannel client. /// @@ -178,13 +212,9 @@ public override void Connect() } //Setup the TCP socket to receive a header - SocketAsyncEventArgs tcpArgs = ObjectCache.GetSocketAsyncEventArgs(); + tcpArgs = ObjectCache.GetSocketAsyncEventArgs(); tcpArgs.BufferList = null; - - SetupReceiveHeader(tcpArgs); - bool headerCompletingAsync = tcpSocket.ReceiveAsync(tcpArgs); - if (!headerCompletingAsync) - AsyncReceiveHeaderCompleted(this, tcpArgs); + SetupReceiveHeader(tcpArgs); //Start receiving UDP packets SocketAsyncEventArgs udpArgs = ObjectCache.GetSocketAsyncEventArgs(); @@ -199,6 +229,12 @@ public override void Connect() //Mark connected to allow sending connectionState = ConnectionState.Connected; + + //Calling synchronously in game loop would probably be better as it keeps messages in socket buffer instead. + + var pollingThread = new Thread(DoPolling); + pollingThread.Name = nameof(BichannelClientConnection) + "." + nameof(DoPolling); + pollingThread.Start(); } /// @@ -211,6 +247,7 @@ public override bool SendMessageReliable(MessageBuffer message) BigEndianHelper.WriteBytes(header, 0, message.Count); SocketAsyncEventArgs args = ObjectCache.GetSocketAsyncEventArgs(); + args.SocketError = SocketError.Success; args.SetBuffer(null, 0, 0); args.BufferList = new List>() @@ -225,7 +262,8 @@ public override bool SendMessageReliable(MessageBuffer message) bool completingAsync; try { - completingAsync = tcpSocket.SendAsync(args); + tcpSocket.Send(args.BufferList); + completingAsync = false; } catch (Exception) { @@ -245,6 +283,7 @@ public override bool SendMessageUnreliable(MessageBuffer message) return false; SocketAsyncEventArgs args = ObjectCache.GetSocketAsyncEventArgs(); + args.SocketError = SocketError.Success; args.BufferList = null; args.SetBuffer(message.Buffer, message.Offset, message.Count); args.UserToken = message; @@ -254,7 +293,8 @@ public override bool SendMessageUnreliable(MessageBuffer message) bool completingAsync; try { - completingAsync = udpSocket.SendAsync(args); + udpSocket.Send(message.Buffer, message.Offset, message.Count, SocketFlags.None); + completingAsync = false; } catch (Exception) { @@ -274,6 +314,11 @@ public override bool Disconnect() return false; connectionState = ConnectionState.Disconnected; + + stopPollingSignal.Set(); + stoppedPollingSignal.WaitOne(); + + tcpSocket.Shutdown(SocketShutdown.Both); return true; @@ -290,156 +335,112 @@ public override IPEndPoint GetRemoteEndPoint(string name) throw new ArgumentException("Endpoint name must either be TCP or UDP"); } + private void DoPolling() + { + while (!stopPollingSignal.WaitOne(1)) + { + PollReceiveHeaderAndBody(); + } + + stoppedPollingSignal.Set(); + } + /// /// Receives TCP header followed by a TCP body, looping the operation becomes asynchronous. /// - /// The socket args to use during the operations. - private void ReceiveHeaderAndBody(SocketAsyncEventArgs args) + private void PollReceiveHeaderAndBody() { + var args = tcpArgs; + while (true) { - if (!WasHeaderReceiveSucessful(args)) - { - HandleDisconnectionDuringHeaderReceive(args); - return; - } - - if (!IsHeaderReceiveComplete(args)) + if (tcpReceiveState == TcpReceiveState.ReceiveHeader) { - UpdateBufferPointers(args); - - try + while (!IsHeaderReceiveComplete(args)) { - bool headerContinueCompletingAsync = tcpSocket.ReceiveAsync(args); - if (headerContinueCompletingAsync) - return; - } - catch (ObjectDisposedException) - { - HandleDisconnectionDuringHeaderReceive(args); - return; - } + UpdateBufferPointers(args); - continue; - } + int bytesAvailable = tcpSocket.Available; + int bytesReceived; - int bodyLength = ProcessHeader(args); + if (bytesAvailable == 0) + return; - SetupReceiveBody(args, bodyLength); - while (true) - { - try - { - bool bodyCompletingAsync = tcpSocket.ReceiveAsync(args); - if (bodyCompletingAsync) + try + { + bytesReceived = tcpSocket.Receive(args.Buffer, args.Offset, Math.Min(bytesAvailable, args.Count), SocketFlags.None); + tcpBytesTransferred = bytesReceived; + } + catch (ObjectDisposedException) + { + HandleDisconnectionDuringTcpReceive(args); return; - } - catch (ObjectDisposedException) - { - HandleDisconnectionDuringBodyReceive(args); - return; - } + } + catch (SocketException ex) + { + if (ex.SocketErrorCode == SocketError.WouldBlock) + return; + + args.SocketError = ex.SocketErrorCode; + HandleDisconnectionDuringTcpReceive(args); + return; + } - if (!WasBodyReceiveSucessful(args)) - { - HandleDisconnectionDuringBodyReceive(args); - return; + if (bytesReceived == 0) + return; } - if (IsBodyReceiveComplete(args)) - break; - - UpdateBufferPointers(args); + int bodyLength = ProcessHeader(args); + SetupReceiveBody(args, bodyLength); } - MessageBuffer bodyBuffer = ProcessBody(args); - - // Start next receive before invoking events - SetupReceiveHeader(args); - bool headerCompletingAsync; - try - { - headerCompletingAsync = tcpSocket.ReceiveAsync(args); - } - catch (ObjectDisposedException) + if (tcpReceiveState == TcpReceiveState.ReceiveBody) { - HandleDisconnectionDuringHeaderReceive(args); - return; - } - - ProcessMessage(bodyBuffer); - - if (headerCompletingAsync) - return; - } - } + while (!IsBodyReceiveComplete(args)) + { + UpdateBufferPointers(args); - /// - /// Event handler for when a TCP header has been received. - /// - /// The invoking object. - /// The socket args used during the operation. - private void AsyncReceiveHeaderCompleted(object sender, SocketAsyncEventArgs args) - { - //We can move straight back into main loop - ReceiveHeaderAndBody(args); - } + int bytesAvailable = tcpSocket.Available; + int bytesReceived; - /// - /// Event handler for when a TCP body has been received. - /// - /// The invoking object. - /// The socket args used during the operation. - private void AsyncReceiveBodyCompleted(object sender, SocketAsyncEventArgs args) - { - while (true) - { - if (!WasBodyReceiveSucessful(args)) - { - HandleDisconnectionDuringBodyReceive(args); - return; - } + if (bytesAvailable == 0) + return; - if (IsBodyReceiveComplete(args)) - break; + try + { + bytesReceived = tcpSocket.Receive(args.Buffer, args.Offset, Math.Min(bytesAvailable, args.Count), SocketFlags.None); + tcpBytesTransferred = bytesReceived; + } + catch (ObjectDisposedException) + { + HandleDisconnectionDuringTcpReceive(args); + return; + } + catch (SocketException ex) + { + if (ex.SocketErrorCode == SocketError.WouldBlock) + return; + + args.SocketError = ex.SocketErrorCode; + HandleDisconnectionDuringTcpReceive(args); + return; + } - UpdateBufferPointers(args); + if (bytesReceived == 0) + return; + } - try - { - bool bodyContinueCompletingAsync = tcpSocket.ReceiveAsync(args); - if (bodyContinueCompletingAsync) - return; - } - catch (ObjectDisposedException) - { - HandleDisconnectionDuringBodyReceive(args); - return; + try + { + MessageBuffer bodyBuffer = ProcessBody(args); + ProcessMessage(bodyBuffer); + } + finally + { + SetupReceiveHeader(tcpArgs); + } } } - - MessageBuffer bodyBuffer = ProcessBody(args); - - // Start next receive before invoking events - SetupReceiveHeader(args); - bool headerCompletingAsync; - try - { - headerCompletingAsync = tcpSocket.ReceiveAsync(args); - } - catch (ObjectDisposedException) - { - HandleDisconnectionDuringHeaderReceive(args); - return; - } - - ProcessMessage(bodyBuffer); - - if (headerCompletingAsync) - return; - - //Now move back into main loop until no more data is present - ReceiveHeaderAndBody(args); } /// @@ -449,9 +450,12 @@ private void AsyncReceiveBodyCompleted(object sender, SocketAsyncEventArgs args) /// If the whole header has been received. private bool IsHeaderReceiveComplete(SocketAsyncEventArgs args) { + if (tcpBytesTransferred == 0) + return false; + MessageBuffer headerBuffer = (MessageBuffer)args.UserToken; - return args.Offset + args.BytesTransferred - headerBuffer.Offset >= headerBuffer.Count; + return args.Offset + tcpBytesTransferred - headerBuffer.Offset >= headerBuffer.Count; } /// @@ -463,7 +467,7 @@ private bool IsBodyReceiveComplete(SocketAsyncEventArgs args) { MessageBuffer bodyBuffer = (MessageBuffer)args.UserToken; - return args.Offset + args.BytesTransferred - bodyBuffer.Offset >= bodyBuffer.Count; + return args.Offset + tcpBytesTransferred - bodyBuffer.Offset >= bodyBuffer.Count; } /// @@ -479,8 +483,6 @@ private int ProcessHeader(SocketAsyncEventArgs args) headerBuffer.Dispose(); - args.Completed -= AsyncReceiveHeaderCompleted; - return bodyLength; } @@ -491,7 +493,6 @@ private int ProcessHeader(SocketAsyncEventArgs args) /// The buffer received. private MessageBuffer ProcessBody(SocketAsyncEventArgs args) { - args.Completed -= AsyncReceiveBodyCompleted; return (MessageBuffer)args.UserToken; } @@ -506,54 +507,20 @@ private void ProcessMessage(MessageBuffer buffer) buffer.Dispose(); } - /// - /// Checks if a TCP header was received correctly. - /// - /// The socket args used during the operation. - /// If the receive completed correctly. - private bool WasHeaderReceiveSucessful(SocketAsyncEventArgs args) - { - return args.BytesTransferred != 0 && args.SocketError == SocketError.Success; - } - - /// - /// Checks if a TCP body was received correctly. - /// - /// The socket args used during the operation. - /// If the receive completed correctly. - private bool WasBodyReceiveSucessful(SocketAsyncEventArgs args) - { - return args.BytesTransferred != 0 && args.SocketError == SocketError.Success; - } - /// /// Handles a disconnection while receiving a TCP header. /// /// The socket args used during the operation. - private void HandleDisconnectionDuringHeaderReceive(SocketAsyncEventArgs args) + private void HandleDisconnectionDuringTcpReceive(SocketAsyncEventArgs args) { Disconnect(args.SocketError); + /* MessageBuffer buffer = (MessageBuffer)args.UserToken; buffer.Dispose(); - args.Completed -= AsyncReceiveHeaderCompleted; - ObjectCache.ReturnSocketAsyncEventArgs(args); - } - - /// - /// Handles a disconnection while receiving a TCP body. - /// - /// The socket args used during the operation. - private void HandleDisconnectionDuringBodyReceive(SocketAsyncEventArgs args) - { - Disconnect(args.SocketError); - - MessageBuffer buffer = (MessageBuffer)args.UserToken; - buffer.Dispose(); - - args.Completed -= AsyncReceiveBodyCompleted; ObjectCache.ReturnSocketAsyncEventArgs(args); + */ } /// @@ -562,11 +529,13 @@ private void HandleDisconnectionDuringBodyReceive(SocketAsyncEventArgs args) /// The socket args to use during the operation. private void SetupReceiveHeader(SocketAsyncEventArgs args) { + tcpBytesTransferred = 0; + tcpReceiveState = TcpReceiveState.ReceiveHeader; + MessageBuffer headerBuffer = MessageBuffer.Create(4); args.SetBuffer(headerBuffer.Buffer, headerBuffer.Offset, 4); args.UserToken = headerBuffer; - args.Completed += AsyncReceiveHeaderCompleted; } /// @@ -576,12 +545,14 @@ private void SetupReceiveHeader(SocketAsyncEventArgs args) /// The number of bytes in the body. private void SetupReceiveBody(SocketAsyncEventArgs args, int length) { + tcpBytesTransferred = 0; + tcpReceiveState = TcpReceiveState.ReceiveBody; + MessageBuffer bodyBuffer = MessageBuffer.Create(length); bodyBuffer.Count = length; args.SetBuffer(bodyBuffer.Buffer, bodyBuffer.Offset, length); args.UserToken = bodyBuffer; - args.Completed += AsyncReceiveBodyCompleted; } /// @@ -589,7 +560,7 @@ private void SetupReceiveBody(SocketAsyncEventArgs args, int length) /// /// The socket args to update. private void UpdateBufferPointers(SocketAsyncEventArgs args) { - args.SetBuffer(args.Offset + args.BytesTransferred, args.Count - args.BytesTransferred); + args.SetBuffer(args.Offset + tcpBytesTransferred, args.Count - tcpBytesTransferred); } /// @@ -686,6 +657,9 @@ private void Disconnect(SocketError error) { connectionState = ConnectionState.Disconnected; + stopPollingSignal.Set(); + stoppedPollingSignal.WaitOne(); + HandleDisconnection(error); } } @@ -709,6 +683,15 @@ protected override void Dispose(bool disposing) tcpSocket.Close(); udpSocket.Close(); + + if (tcpArgs != null) + { + MessageBuffer buffer = (MessageBuffer)tcpArgs.UserToken; + buffer.Dispose(); + + ObjectCache.ReturnSocketAsyncEventArgs(tcpArgs); + tcpArgs = null; + } } disposedValue = true; diff --git a/DarkRift.Server/Client.cs b/DarkRift.Server/Client.cs index 087b822..dbf9c08 100644 --- a/DarkRift.Server/Client.cs +++ b/DarkRift.Server/Client.cs @@ -14,6 +14,7 @@ using System.Diagnostics; using DarkRift.DataStructures; using DarkRift.Server.Metrics; +using DarkRift.Client; namespace DarkRift.Server { diff --git a/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelListener.cs b/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelListener.cs index 352f25a..27ef559 100644 --- a/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelListener.cs +++ b/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelListener.cs @@ -159,6 +159,7 @@ private struct UdpSendOperation internal override bool SendUdpBuffer(EndPoint remoteEndPoint, MessageBuffer message, Action completed) { SocketAsyncEventArgs args = ObjectCache.GetSocketAsyncEventArgs(); + args.SocketError = SocketError.Success; args.BufferList = null; args.UserToken = new UdpSendOperation { callback = completed, message = message }; args.SetBuffer(message.Buffer, message.Offset, message.Count); @@ -168,7 +169,8 @@ internal override bool SendUdpBuffer(EndPoint remoteEndPoint, MessageBuffer mess bool completingAsync; try { - completingAsync = UdpListener.SendToAsync(args); + UdpListener.SendTo(message.Buffer, message.Offset, message.Count, SocketFlags.None, remoteEndPoint); + completingAsync = false; } catch (Exception e) { diff --git a/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelServerConnection.cs b/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelServerConnection.cs index f77e8bf..6eeb87a 100644 --- a/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelServerConnection.cs +++ b/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelServerConnection.cs @@ -139,6 +139,7 @@ public override bool SendMessageReliable(MessageBuffer message) BigEndianHelper.WriteBytes(header, 0, message.Count); SocketAsyncEventArgs args = ObjectCache.GetSocketAsyncEventArgs(); + args.SocketError = SocketError.Success; args.SetBuffer(null, 0, 0); args.BufferList = new List>() // TODO pooollllllll! @@ -154,7 +155,8 @@ public override bool SendMessageReliable(MessageBuffer message) bool completingAsync; try { - completingAsync = tcpSocket.SendAsync(args); + tcpSocket.Send(args.BufferList); + completingAsync = false; } catch (Exception) { From 39c0c6e0b5d35b4fb4f6290de88e1074106bb9ce Mon Sep 17 00:00:00 2001 From: "petter.hansson" Date: Wed, 26 Oct 2022 18:43:23 +0200 Subject: [PATCH 2/7] Reusable polling thread. --- DarkRift.Client/BichannelClientConnection.cs | 48 +++------- DarkRift/Dispatching/WorkerThread.cs | 95 ++++++++++++++++++++ 2 files changed, 108 insertions(+), 35 deletions(-) create mode 100644 DarkRift/Dispatching/WorkerThread.cs diff --git a/DarkRift.Client/BichannelClientConnection.cs b/DarkRift.Client/BichannelClientConnection.cs index 75b0970..5b9ffdb 100644 --- a/DarkRift.Client/BichannelClientConnection.cs +++ b/DarkRift.Client/BichannelClientConnection.cs @@ -4,6 +4,7 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ +using DarkRift.Dispatching; using System; using System.Collections.Generic; using System.Linq; @@ -82,8 +83,6 @@ private ConnectionState connectionState private SocketAsyncEventArgs tcpArgs; private TcpReceiveState tcpReceiveState; private int tcpBytesTransferred; - private readonly ManualResetEvent stopPollingSignal = new ManualResetEvent(false); - private readonly ManualResetEvent stoppedPollingSignal = new ManualResetEvent(false); private enum TcpReceiveState { @@ -232,9 +231,7 @@ public override void Connect() //Calling synchronously in game loop would probably be better as it keeps messages in socket buffer instead. - var pollingThread = new Thread(DoPolling); - pollingThread.Name = nameof(BichannelClientConnection) + "." + nameof(DoPolling); - pollingThread.Start(); + PollingThread.AddWork(DoPolling); } /// @@ -257,21 +254,20 @@ public override bool SendMessageReliable(MessageBuffer message) }; args.UserToken = message; - args.Completed += TcpSendCompleted; - - bool completingAsync; try { tcpSocket.Send(args.BufferList); - completingAsync = false; + } + catch (SocketException ex) + { + args.SocketError = ex.SocketErrorCode; } catch (Exception) { return false; } - if (!completingAsync) - TcpSendCompleted(this, args); + TcpSendCompleted(args); return true; } @@ -315,9 +311,7 @@ public override bool Disconnect() connectionState = ConnectionState.Disconnected; - stopPollingSignal.Set(); - stoppedPollingSignal.WaitOne(); - + PollingThread.RemoveWork(DoPolling); tcpSocket.Shutdown(SocketShutdown.Both); @@ -337,18 +331,13 @@ public override IPEndPoint GetRemoteEndPoint(string name) private void DoPolling() { - while (!stopPollingSignal.WaitOne(1)) - { - PollReceiveHeaderAndBody(); - } - - stoppedPollingSignal.Set(); + PollReceiveTcpHeaderAndBody(); } /// /// Receives TCP header followed by a TCP body, looping the operation becomes asynchronous. /// - private void PollReceiveHeaderAndBody() + private void PollReceiveTcpHeaderAndBody() { var args = tcpArgs; @@ -514,17 +503,10 @@ private void ProcessMessage(MessageBuffer buffer) private void HandleDisconnectionDuringTcpReceive(SocketAsyncEventArgs args) { Disconnect(args.SocketError); - - /* - MessageBuffer buffer = (MessageBuffer)args.UserToken; - buffer.Dispose(); - - ObjectCache.ReturnSocketAsyncEventArgs(args); - */ } /// - /// Setup a lsiten operation for a new TCP header. + /// Setup a listen operation for a new TCP header. /// /// The socket args to use during the operation. private void SetupReceiveHeader(SocketAsyncEventArgs args) @@ -614,15 +596,12 @@ private void UdpReceiveCompleted(object sender, SocketAsyncEventArgs e) /// /// Called when a TCP send has completed. /// - /// /// - private void TcpSendCompleted(object sender, SocketAsyncEventArgs e) + private void TcpSendCompleted(SocketAsyncEventArgs e) { if (e.SocketError != SocketError.Success) Disconnect(e.SocketError); - e.Completed -= TcpSendCompleted; - //Always dispose buffer when completed! ((MessageBuffer)e.UserToken).Dispose(); @@ -657,8 +636,7 @@ private void Disconnect(SocketError error) { connectionState = ConnectionState.Disconnected; - stopPollingSignal.Set(); - stoppedPollingSignal.WaitOne(); + PollingThread.RemoveWork(DoPolling); HandleDisconnection(error); } diff --git a/DarkRift/Dispatching/WorkerThread.cs b/DarkRift/Dispatching/WorkerThread.cs new file mode 100644 index 0000000..66167a4 --- /dev/null +++ b/DarkRift/Dispatching/WorkerThread.cs @@ -0,0 +1,95 @@ +using System; +using System.Collections.Generic; +using System.Threading; + +namespace DarkRift.Dispatching +{ + internal static class PollingThread + { + public static Action ExceptionHandler { get; set; } + + private static readonly object myLock = new object(); + private static bool threadStarted; + private static readonly List workList = new List(); + private static Action[] threadsafeWorkList = new Action[0]; + + private static readonly ManualResetEvent stopEvent = new ManualResetEvent(false); + + public static void AddWork(Action work) + { + lock (myLock) + { + workList.Add(work); + threadsafeWorkList = workList.ToArray(); + + if (!threadStarted) + { + StartThread(); + } + } + } + + public static void RemoveWork(Action work) + { + lock (myLock) + { + workList.Remove(work); + threadsafeWorkList = workList.ToArray(); + } + } + + public static void StopThread() + { + stopEvent.Set(); + + //No support to restart currently since it is presently regarded superfluous (see threadStarted). + } + + private static void StartThread() + { + stopEvent.Reset(); + threadStarted = true; + + var thread = new Thread(PollingThreadLogic); + thread.IsBackground = true; + thread.Name = "DarkRift 2 Polling Thread"; + thread.Start(); + } + + private static void PollingThreadLogic() + { + var rng = new Random(); + + while (!stopEvent.WaitOne(1)) + { + Action[] work = threadsafeWorkList; + rng.Shuffle(work); + + foreach (Action item in work) + { + try + { + item?.Invoke(); + } + catch (Exception ex) + { + RemoveWork(item); + ExceptionHandler?.Invoke(ex); + } + } + } + } + + private static void Shuffle(this Random rng, T[] array) + { + int n = array.Length; + while (n > 1) + { + int k = rng.Next(n--); + T temp = array[n]; + array[n] = array[k]; + array[k] = temp; + } + } + } +} From 3df01c48101f4fe60b0f5dd6445da6e8462edc94 Mon Sep 17 00:00:00 2001 From: "petter.hansson" Date: Wed, 26 Oct 2022 19:06:26 +0200 Subject: [PATCH 3/7] Reduced code duplication. --- DarkRift.Client/BichannelClientConnection.cs | 125 +++++++------------ 1 file changed, 42 insertions(+), 83 deletions(-) diff --git a/DarkRift.Client/BichannelClientConnection.cs b/DarkRift.Client/BichannelClientConnection.cs index 5b9ffdb..3bc4d95 100644 --- a/DarkRift.Client/BichannelClientConnection.cs +++ b/DarkRift.Client/BichannelClientConnection.cs @@ -335,7 +335,8 @@ private void DoPolling() } /// - /// Receives TCP header followed by a TCP body, looping the operation becomes asynchronous. + /// Receives TCP header followed by a TCP body. The operation + /// may exit early in an incomplete state. /// private void PollReceiveTcpHeaderAndBody() { @@ -345,39 +346,8 @@ private void PollReceiveTcpHeaderAndBody() { if (tcpReceiveState == TcpReceiveState.ReceiveHeader) { - while (!IsHeaderReceiveComplete(args)) - { - UpdateBufferPointers(args); - - int bytesAvailable = tcpSocket.Available; - int bytesReceived; - - if (bytesAvailable == 0) - return; - - try - { - bytesReceived = tcpSocket.Receive(args.Buffer, args.Offset, Math.Min(bytesAvailable, args.Count), SocketFlags.None); - tcpBytesTransferred = bytesReceived; - } - catch (ObjectDisposedException) - { - HandleDisconnectionDuringTcpReceive(args); - return; - } - catch (SocketException ex) - { - if (ex.SocketErrorCode == SocketError.WouldBlock) - return; - - args.SocketError = ex.SocketErrorCode; - HandleDisconnectionDuringTcpReceive(args); - return; - } - - if (bytesReceived == 0) - return; - } + if (!PollReceiveTcpNonBlocking(args)) + return; int bodyLength = ProcessHeader(args); SetupReceiveBody(args, bodyLength); @@ -385,39 +355,8 @@ private void PollReceiveTcpHeaderAndBody() if (tcpReceiveState == TcpReceiveState.ReceiveBody) { - while (!IsBodyReceiveComplete(args)) - { - UpdateBufferPointers(args); - - int bytesAvailable = tcpSocket.Available; - int bytesReceived; - - if (bytesAvailable == 0) - return; - - try - { - bytesReceived = tcpSocket.Receive(args.Buffer, args.Offset, Math.Min(bytesAvailable, args.Count), SocketFlags.None); - tcpBytesTransferred = bytesReceived; - } - catch (ObjectDisposedException) - { - HandleDisconnectionDuringTcpReceive(args); - return; - } - catch (SocketException ex) - { - if (ex.SocketErrorCode == SocketError.WouldBlock) - return; - - args.SocketError = ex.SocketErrorCode; - HandleDisconnectionDuringTcpReceive(args); - return; - } - - if (bytesReceived == 0) - return; - } + if (!PollReceiveTcpNonBlocking(args)) + return; try { @@ -432,31 +371,51 @@ private void PollReceiveTcpHeaderAndBody() } } - /// - /// Checks if a TCP header was received in its entirety. - /// - /// The socket args used during the operation. - /// If the whole header has been received. - private bool IsHeaderReceiveComplete(SocketAsyncEventArgs args) + private bool IsReceiveComplete(SocketAsyncEventArgs args) { if (tcpBytesTransferred == 0) return false; - MessageBuffer headerBuffer = (MessageBuffer)args.UserToken; + MessageBuffer buffer = (MessageBuffer)args.UserToken; - return args.Offset + tcpBytesTransferred - headerBuffer.Offset >= headerBuffer.Count; + return args.Offset + tcpBytesTransferred - buffer.Offset >= buffer.Count; } - /// - /// Checks if a TCP body was received in its entirety. - /// - /// The socket args used during the operation. - /// If the whole body has been received. - private bool IsBodyReceiveComplete(SocketAsyncEventArgs args) + private bool PollReceiveTcpNonBlocking(SocketAsyncEventArgs args) { - MessageBuffer bodyBuffer = (MessageBuffer)args.UserToken; + while (!IsReceiveComplete(args)) + { + UpdateBufferPointers(args); + + int bytesAvailable = tcpSocket.Available; - return args.Offset + tcpBytesTransferred - bodyBuffer.Offset >= bodyBuffer.Count; + if (bytesAvailable == 0) + return false; + + try + { + tcpBytesTransferred = tcpSocket.Receive(args.Buffer, args.Offset, Math.Min(bytesAvailable, args.Count), SocketFlags.None); + } + catch (ObjectDisposedException) + { + HandleDisconnectionDuringTcpReceive(args); + return false; + } + catch (SocketException ex) + { + if (ex.SocketErrorCode == SocketError.WouldBlock) + return false; + + args.SocketError = ex.SocketErrorCode; + HandleDisconnectionDuringTcpReceive(args); + return false; + } + + if (tcpBytesTransferred == 0) + return false; + } + + return true; } /// From 673ecb6136033c79773d6f9a6a65133fb377873e Mon Sep 17 00:00:00 2001 From: "petter.hansson" Date: Wed, 26 Oct 2022 20:08:24 +0200 Subject: [PATCH 4/7] Synchronous UDP receive. --- DarkRift.Client/BichannelClientConnection.cs | 119 +++++++------------ 1 file changed, 46 insertions(+), 73 deletions(-) diff --git a/DarkRift.Client/BichannelClientConnection.cs b/DarkRift.Client/BichannelClientConnection.cs index 3bc4d95..acd32f2 100644 --- a/DarkRift.Client/BichannelClientConnection.cs +++ b/DarkRift.Client/BichannelClientConnection.cs @@ -84,6 +84,8 @@ private ConnectionState connectionState private TcpReceiveState tcpReceiveState; private int tcpBytesTransferred; + private SocketAsyncEventArgs udpArgs; + private enum TcpReceiveState { ReceiveHeader, @@ -216,16 +218,10 @@ public override void Connect() SetupReceiveHeader(tcpArgs); //Start receiving UDP packets - SocketAsyncEventArgs udpArgs = ObjectCache.GetSocketAsyncEventArgs(); + udpArgs = ObjectCache.GetSocketAsyncEventArgs(); udpArgs.BufferList = null; udpArgs.SetBuffer(new byte[ushort.MaxValue], 0, ushort.MaxValue); - udpArgs.Completed += UdpReceiveCompleted; - - bool udpCompletingAsync = udpSocket.ReceiveAsync(udpArgs); - if (!udpCompletingAsync) - UdpReceiveCompleted(this, udpArgs); - //Mark connected to allow sending connectionState = ConnectionState.Connected; @@ -267,7 +263,7 @@ public override bool SendMessageReliable(MessageBuffer message) return false; } - TcpSendCompleted(args); + SendCompleted(args); return true; } @@ -283,22 +279,17 @@ public override bool SendMessageUnreliable(MessageBuffer message) args.BufferList = null; args.SetBuffer(message.Buffer, message.Offset, message.Count); args.UserToken = message; - - args.Completed += UdpSendCompleted; - - bool completingAsync; + try { udpSocket.Send(message.Buffer, message.Offset, message.Count, SocketFlags.None); - completingAsync = false; } catch (Exception) { return false; } - if (!completingAsync) - UdpSendCompleted(this, args); + SendCompleted(args); return true; } @@ -331,17 +322,16 @@ public override IPEndPoint GetRemoteEndPoint(string name) private void DoPolling() { - PollReceiveTcpHeaderAndBody(); + PollReceiveTcpHeaderAndBody(tcpArgs); + PollReceiveUdpNonBlocking(udpArgs); } /// /// Receives TCP header followed by a TCP body. The operation /// may exit early in an incomplete state. /// - private void PollReceiveTcpHeaderAndBody() + private void PollReceiveTcpHeaderAndBody(SocketAsyncEventArgs args) { - var args = tcpArgs; - while (true) { if (tcpReceiveState == TcpReceiveState.ReceiveHeader) @@ -371,7 +361,7 @@ private void PollReceiveTcpHeaderAndBody() } } - private bool IsReceiveComplete(SocketAsyncEventArgs args) + private bool IsTcpReceiveComplete(SocketAsyncEventArgs args) { if (tcpBytesTransferred == 0) return false; @@ -383,7 +373,7 @@ private bool IsReceiveComplete(SocketAsyncEventArgs args) private bool PollReceiveTcpNonBlocking(SocketAsyncEventArgs args) { - while (!IsReceiveComplete(args)) + while (!IsTcpReceiveComplete(args)) { UpdateBufferPointers(args); @@ -507,78 +497,55 @@ private void UpdateBufferPointers(SocketAsyncEventArgs args) { /// /// Called when a UDP message arrives. /// - /// /// - private void UdpReceiveCompleted(object sender, SocketAsyncEventArgs e) + private void PollReceiveUdpNonBlocking(SocketAsyncEventArgs e) { - bool completingAsync; - do + while (true) { - //If we received a Success then process it - if (e.SocketError == SocketError.Success) - { - using (MessageBuffer buffer = MessageBuffer.Create(e.BytesTransferred)) - { - Buffer.BlockCopy(e.Buffer, 0, buffer.Buffer, buffer.Offset, e.BytesTransferred); - buffer.Count = e.BytesTransferred; - - completingAsync = udpSocket.ReceiveAsync(e); - - //Length of 0 must be a hole punching packet - if (buffer.Count != 0) - HandleMessageReceived(buffer, SendMode.Unreliable); - } - } + int bytesAvailable = udpSocket.Available; + if (bytesAvailable == 0) + return; - //Ignore ConnectionReset (ICMP Port Unreachable) since NATs will return that when they get - //the punchthrough packets and they've not already been opened - else if (e.SocketError == SocketError.ConnectionReset) + int bytesTransferred; + try { - completingAsync = udpSocket.ReceiveAsync(e); + bytesTransferred = udpSocket.Receive(e.Buffer, ushort.MaxValue, SocketFlags.None); } - - //Anything else is probably bad news - else + catch (SocketException ex) { - Disconnect(e.SocketError); - - e.Completed -= UdpReceiveCompleted; - ObjectCache.ReturnSocketAsyncEventArgs(e); + if (ex.SocketErrorCode == SocketError.ConnectionReset) + { + //Ignore ConnectionReset (ICMP Port Unreachable) since NATs will return that when they get + //the punchthrough packets and they've not already been opened + return; + } - // Leave the loop + //Anything else is probably bad news + Disconnect(e.SocketError); return; } + + using (MessageBuffer buffer = MessageBuffer.Create(bytesTransferred)) + { + Buffer.BlockCopy(e.Buffer, 0, buffer.Buffer, buffer.Offset, bytesTransferred); + buffer.Count = bytesTransferred; - } while (!completingAsync); - } - - /// - /// Called when a TCP send has completed. - /// - /// - private void TcpSendCompleted(SocketAsyncEventArgs e) - { - if (e.SocketError != SocketError.Success) - Disconnect(e.SocketError); - - //Always dispose buffer when completed! - ((MessageBuffer)e.UserToken).Dispose(); - - ObjectCache.ReturnSocketAsyncEventArgs(e); + //Length of 0 must be a hole punching packet + if (buffer.Count != 0) + HandleMessageReceived(buffer, SendMode.Unreliable); + } + } } /// - /// Called when a UDP send has completed. + /// Called when a TCP or UDP send has completed. /// - /// /// - private void UdpSendCompleted(object sender, SocketAsyncEventArgs e) + private void SendCompleted(SocketAsyncEventArgs e) { if (e.SocketError != SocketError.Success) Disconnect(e.SocketError); - e.Completed -= UdpSendCompleted; - //Always dispose buffer when completed! ((MessageBuffer)e.UserToken).Dispose(); @@ -629,6 +596,12 @@ protected override void Dispose(bool disposing) ObjectCache.ReturnSocketAsyncEventArgs(tcpArgs); tcpArgs = null; } + if (udpArgs != null) + { + udpArgs.SetBuffer(null, 0, 0); + ObjectCache.ReturnSocketAsyncEventArgs(udpArgs); + udpArgs = null; + } } disposedValue = true; From 51ccfd7552e3d5205d5fa39391ad7d35c7a4538f Mon Sep 17 00:00:00 2001 From: "petter.hansson" Date: Wed, 26 Oct 2022 23:17:10 +0200 Subject: [PATCH 5/7] Split up UDP and TCP synch socket handling to a) make reusable and b) make code easier to follow. --- DarkRift.Client/BichannelClientConnection.cs | 380 ++----------------- DarkRift/SynchronousTcpSocket.cs | 309 +++++++++++++++ DarkRift/SynchronousUdpSocket.cs | 152 ++++++++ 3 files changed, 498 insertions(+), 343 deletions(-) create mode 100644 DarkRift/SynchronousTcpSocket.cs create mode 100644 DarkRift/SynchronousUdpSocket.cs diff --git a/DarkRift.Client/BichannelClientConnection.cs b/DarkRift.Client/BichannelClientConnection.cs index acd32f2..f7ef066 100644 --- a/DarkRift.Client/BichannelClientConnection.cs +++ b/DarkRift.Client/BichannelClientConnection.cs @@ -34,8 +34,8 @@ public sealed class BichannelClientConnection : NetworkClientConnection, IDispos /// Whether Nagel's algorithm should be disabled or not. /// public bool NoDelay { - get => tcpSocket.NoDelay; - set => tcpSocket.NoDelay = value; + get => tcp.Socket.NoDelay; + set => tcp.Socket.NoDelay = value; } /// @@ -70,27 +70,8 @@ private ConnectionState connectionState private ConnectionState lockedConnectionState; private readonly object myLock = new object(); - /// - /// The socket used in TCP communication. - /// - private readonly Socket tcpSocket; - - /// - /// The socket used in UDP communication. - /// - private readonly Socket udpSocket; - - private SocketAsyncEventArgs tcpArgs; - private TcpReceiveState tcpReceiveState; - private int tcpBytesTransferred; - - private SocketAsyncEventArgs udpArgs; - - private enum TcpReceiveState - { - ReceiveHeader, - ReceiveBody, - } + private readonly SynchronousTcpSocket tcp; + private readonly SynchronousUdpSocket udp; /// /// Creates a new bichannel client. @@ -116,8 +97,10 @@ public BichannelClientConnection(IPAddress ipAddress, int tcpPort, int udpPort, RemoteTcpEndPoint = new IPEndPoint(ipAddress, tcpPort); RemoteUdpEndPoint = new IPEndPoint(ipAddress, udpPort); - tcpSocket = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - udpSocket = new Socket(tcpSocket.AddressFamily, SocketType.Dgram, ProtocolType.Udp); + var tcpSocket = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + tcp = new SynchronousTcpSocket(tcpSocket, Disconnect, HandleMessageReceived); + var udpSocket = new Socket(tcpSocket.AddressFamily, SocketType.Dgram, ProtocolType.Udp); + udp = new SynchronousUdpSocket(udpSocket, Disconnect, HandleMessageReceived); NoDelay = noDelay; } @@ -138,9 +121,11 @@ public BichannelClientConnection(IPVersion ipVersion, IPAddress ipAddress, int p RemoteTcpEndPoint = new IPEndPoint(ipAddress, port); RemoteUdpEndPoint = new IPEndPoint(ipAddress, port); - tcpSocket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp); - udpSocket = new Socket(tcpSocket.AddressFamily, SocketType.Dgram, ProtocolType.Udp); - + var tcpSocket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp); + tcp = new SynchronousTcpSocket(tcpSocket, Disconnect, HandleMessageReceived); + var udpSocket = new Socket(tcpSocket.AddressFamily, SocketType.Dgram, ProtocolType.Udp); + udp = new SynchronousUdpSocket(udpSocket, Disconnect, HandleMessageReceived); + NoDelay = noDelay; } @@ -154,7 +139,7 @@ public override void Connect() //Connect TCP try { - tcpSocket.Connect(RemoteTcpEndPoint); + tcp.Socket.Connect(RemoteTcpEndPoint); } catch (SocketException e) { @@ -164,8 +149,8 @@ public override void Connect() try { //Bind UDP to a free port - udpSocket.Bind(new IPEndPoint(((IPEndPoint)tcpSocket.LocalEndPoint).Address, 0)); - udpSocket.Connect(RemoteUdpEndPoint); + udp.Socket.Bind(new IPEndPoint(((IPEndPoint)tcp.Socket.LocalEndPoint).Address, 0)); + udp.Socket.Connect(RemoteUdpEndPoint); } catch (SocketException e) { @@ -174,28 +159,28 @@ public override void Connect() //Receive auth token from TCP byte[] buffer = new byte[9]; - tcpSocket.ReceiveTimeout = 5000; - int receivedTcp = tcpSocket.Receive(buffer); - tcpSocket.ReceiveTimeout = 0; //Reset to infinite + tcp.Socket.ReceiveTimeout = 5000; + int receivedTcp = tcp.Socket.Receive(buffer); + tcp.Socket.ReceiveTimeout = 0; //Reset to infinite if (receivedTcp != 9 || buffer[0] != 0) { - tcpSocket.Shutdown(SocketShutdown.Both); + tcp.Shutdown(); throw new DarkRiftConnectionException("Timeout waiting for auth token from server.", SocketError.ConnectionAborted); } //Transmit token back over UDP to server listening port - udpSocket.Send(buffer); + udp.Socket.Send(buffer); //Receive response from server to initiate the connection buffer = new byte[1]; - udpSocket.ReceiveTimeout = 5000; - int receivedUdp = udpSocket.Receive(buffer); - udpSocket.ReceiveTimeout = 0; //Reset to infinite + udp.Socket.ReceiveTimeout = 5000; + int receivedUdp = udp.Socket.Receive(buffer); + udp.Socket.ReceiveTimeout = 0; //Reset to infinite if (receivedUdp != 1 || buffer[0] != 0) { - tcpSocket.Shutdown(SocketShutdown.Both); + tcp.Shutdown(); throw new DarkRiftConnectionException("Timeout waiting for UDP acknowledgement from server.", SocketError.ConnectionAborted); } } @@ -212,15 +197,8 @@ public override void Connect() throw; } - //Setup the TCP socket to receive a header - tcpArgs = ObjectCache.GetSocketAsyncEventArgs(); - tcpArgs.BufferList = null; - SetupReceiveHeader(tcpArgs); - - //Start receiving UDP packets - udpArgs = ObjectCache.GetSocketAsyncEventArgs(); - udpArgs.BufferList = null; - udpArgs.SetBuffer(new byte[ushort.MaxValue], 0, ushort.MaxValue); + tcp.ResetBuffers(); + udp.ResetBuffers(); //Mark connected to allow sending connectionState = ConnectionState.Connected; @@ -236,36 +214,7 @@ public override bool SendMessageReliable(MessageBuffer message) if (connectionState == ConnectionState.Disconnected) return false; - byte[] header = new byte[4]; - BigEndianHelper.WriteBytes(header, 0, message.Count); - - SocketAsyncEventArgs args = ObjectCache.GetSocketAsyncEventArgs(); - args.SocketError = SocketError.Success; - - args.SetBuffer(null, 0, 0); - args.BufferList = new List>() - { - new ArraySegment(header), - new ArraySegment(message.Buffer, message.Offset, message.Count) - }; - args.UserToken = message; - - try - { - tcpSocket.Send(args.BufferList); - } - catch (SocketException ex) - { - args.SocketError = ex.SocketErrorCode; - } - catch (Exception) - { - return false; - } - - SendCompleted(args); - - return true; + return tcp.SendMessageReliable(message); } /// @@ -274,24 +223,7 @@ public override bool SendMessageUnreliable(MessageBuffer message) if (connectionState == ConnectionState.Disconnected) return false; - SocketAsyncEventArgs args = ObjectCache.GetSocketAsyncEventArgs(); - args.SocketError = SocketError.Success; - args.BufferList = null; - args.SetBuffer(message.Buffer, message.Offset, message.Count); - args.UserToken = message; - - try - { - udpSocket.Send(message.Buffer, message.Offset, message.Count, SocketFlags.None); - } - catch (Exception) - { - return false; - } - - SendCompleted(args); - - return true; + return udp.SendMessageUnreliable(message); } /// @@ -304,7 +236,7 @@ public override bool Disconnect() PollingThread.RemoveWork(DoPolling); - tcpSocket.Shutdown(SocketShutdown.Both); + tcp.Shutdown(); return true; } @@ -322,234 +254,8 @@ public override IPEndPoint GetRemoteEndPoint(string name) private void DoPolling() { - PollReceiveTcpHeaderAndBody(tcpArgs); - PollReceiveUdpNonBlocking(udpArgs); - } - - /// - /// Receives TCP header followed by a TCP body. The operation - /// may exit early in an incomplete state. - /// - private void PollReceiveTcpHeaderAndBody(SocketAsyncEventArgs args) - { - while (true) - { - if (tcpReceiveState == TcpReceiveState.ReceiveHeader) - { - if (!PollReceiveTcpNonBlocking(args)) - return; - - int bodyLength = ProcessHeader(args); - SetupReceiveBody(args, bodyLength); - } - - if (tcpReceiveState == TcpReceiveState.ReceiveBody) - { - if (!PollReceiveTcpNonBlocking(args)) - return; - - try - { - MessageBuffer bodyBuffer = ProcessBody(args); - ProcessMessage(bodyBuffer); - } - finally - { - SetupReceiveHeader(tcpArgs); - } - } - } - } - - private bool IsTcpReceiveComplete(SocketAsyncEventArgs args) - { - if (tcpBytesTransferred == 0) - return false; - - MessageBuffer buffer = (MessageBuffer)args.UserToken; - - return args.Offset + tcpBytesTransferred - buffer.Offset >= buffer.Count; - } - - private bool PollReceiveTcpNonBlocking(SocketAsyncEventArgs args) - { - while (!IsTcpReceiveComplete(args)) - { - UpdateBufferPointers(args); - - int bytesAvailable = tcpSocket.Available; - - if (bytesAvailable == 0) - return false; - - try - { - tcpBytesTransferred = tcpSocket.Receive(args.Buffer, args.Offset, Math.Min(bytesAvailable, args.Count), SocketFlags.None); - } - catch (ObjectDisposedException) - { - HandleDisconnectionDuringTcpReceive(args); - return false; - } - catch (SocketException ex) - { - if (ex.SocketErrorCode == SocketError.WouldBlock) - return false; - - args.SocketError = ex.SocketErrorCode; - HandleDisconnectionDuringTcpReceive(args); - return false; - } - - if (tcpBytesTransferred == 0) - return false; - } - - return true; - } - - /// - /// Processes a TCP header received. - /// - /// The socket args used during the operation. - /// The number of bytes in the body. - private int ProcessHeader(SocketAsyncEventArgs args) - { - MessageBuffer headerBuffer = (MessageBuffer)args.UserToken; - - int bodyLength = BigEndianHelper.ReadInt32(headerBuffer.Buffer, headerBuffer.Offset); - - headerBuffer.Dispose(); - - return bodyLength; - } - - /// - /// Processes a TCP body received. - /// - /// The socket args used during the operation. - /// The buffer received. - private MessageBuffer ProcessBody(SocketAsyncEventArgs args) - { - return (MessageBuffer)args.UserToken; - } - - /// - /// Invokes message recevied events and cleans up. - /// - /// The TCP body received. - private void ProcessMessage(MessageBuffer buffer) - { - HandleMessageReceived(buffer, SendMode.Reliable); - - buffer.Dispose(); - } - - /// - /// Handles a disconnection while receiving a TCP header. - /// - /// The socket args used during the operation. - private void HandleDisconnectionDuringTcpReceive(SocketAsyncEventArgs args) - { - Disconnect(args.SocketError); - } - - /// - /// Setup a listen operation for a new TCP header. - /// - /// The socket args to use during the operation. - private void SetupReceiveHeader(SocketAsyncEventArgs args) - { - tcpBytesTransferred = 0; - tcpReceiveState = TcpReceiveState.ReceiveHeader; - - MessageBuffer headerBuffer = MessageBuffer.Create(4); - - args.SetBuffer(headerBuffer.Buffer, headerBuffer.Offset, 4); - args.UserToken = headerBuffer; - } - - /// - /// Setup a listen operation for a new TCP body. - /// - /// The socket args to use during the operation. - /// The number of bytes in the body. - private void SetupReceiveBody(SocketAsyncEventArgs args, int length) - { - tcpBytesTransferred = 0; - tcpReceiveState = TcpReceiveState.ReceiveBody; - - MessageBuffer bodyBuffer = MessageBuffer.Create(length); - bodyBuffer.Count = length; - - args.SetBuffer(bodyBuffer.Buffer, bodyBuffer.Offset, length); - args.UserToken = bodyBuffer; - } - - /// - /// Updates the pointers on the buffer to continue a receive operation. - /// - /// The socket args to update. - private void UpdateBufferPointers(SocketAsyncEventArgs args) { - args.SetBuffer(args.Offset + tcpBytesTransferred, args.Count - tcpBytesTransferred); - } - - /// - /// Called when a UDP message arrives. - /// - /// - private void PollReceiveUdpNonBlocking(SocketAsyncEventArgs e) - { - while (true) - { - int bytesAvailable = udpSocket.Available; - if (bytesAvailable == 0) - return; - - int bytesTransferred; - try - { - bytesTransferred = udpSocket.Receive(e.Buffer, ushort.MaxValue, SocketFlags.None); - } - catch (SocketException ex) - { - if (ex.SocketErrorCode == SocketError.ConnectionReset) - { - //Ignore ConnectionReset (ICMP Port Unreachable) since NATs will return that when they get - //the punchthrough packets and they've not already been opened - return; - } - - //Anything else is probably bad news - Disconnect(e.SocketError); - return; - } - - using (MessageBuffer buffer = MessageBuffer.Create(bytesTransferred)) - { - Buffer.BlockCopy(e.Buffer, 0, buffer.Buffer, buffer.Offset, bytesTransferred); - buffer.Count = bytesTransferred; - - //Length of 0 must be a hole punching packet - if (buffer.Count != 0) - HandleMessageReceived(buffer, SendMode.Unreliable); - } - } - } - - /// - /// Called when a TCP or UDP send has completed. - /// - /// - private void SendCompleted(SocketAsyncEventArgs e) - { - if (e.SocketError != SocketError.Success) - Disconnect(e.SocketError); - - //Always dispose buffer when completed! - ((MessageBuffer)e.UserToken).Dispose(); - - ObjectCache.ReturnSocketAsyncEventArgs(e); + tcp.PollReceiveHeaderAndBodyNonBlocking(); + udp.PollReceiveBodyNonBlocking(); } /// @@ -585,23 +291,11 @@ protected override void Dispose(bool disposing) { Disconnect(); - tcpSocket.Close(); - udpSocket.Close(); - - if (tcpArgs != null) - { - MessageBuffer buffer = (MessageBuffer)tcpArgs.UserToken; - buffer.Dispose(); - - ObjectCache.ReturnSocketAsyncEventArgs(tcpArgs); - tcpArgs = null; - } - if (udpArgs != null) - { - udpArgs.SetBuffer(null, 0, 0); - ObjectCache.ReturnSocketAsyncEventArgs(udpArgs); - udpArgs = null; - } + tcp.Socket.Close(); + udp.Socket.Close(); + + tcp.Dispose(); + udp.Dispose(); } disposedValue = true; diff --git a/DarkRift/SynchronousTcpSocket.cs b/DarkRift/SynchronousTcpSocket.cs new file mode 100644 index 0000000..2329e4b --- /dev/null +++ b/DarkRift/SynchronousTcpSocket.cs @@ -0,0 +1,309 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Sockets; +using System.Text; + +namespace DarkRift +{ + internal class SynchronousTcpSocket : IDisposable + { + public Socket Socket { get; private set; } + + private readonly Action disconnect; + private readonly Action handleMessage; + + private SocketAsyncEventArgs tcpArgs; + private TcpReceiveState tcpReceiveState; + private int tcpBytesTransferred; + private bool disposedValue; + + private enum TcpReceiveState + { + ReceiveHeader, + ReceiveBody, + } + + public SynchronousTcpSocket(Socket socket, Action disconnect, Action handleMessage) + { + Socket = socket; + this.disconnect = disconnect; + this.handleMessage = handleMessage; + + //TODO: be able to call ResetBuffers() + } + + public void ResetBuffers() + { + //Setup the TCP socket to receive a header + tcpArgs = ObjectCache.GetSocketAsyncEventArgs(); + tcpArgs.BufferList = null; + + SetupReceiveHeader(tcpArgs); + } + + public void Shutdown() + { + Socket.Shutdown(SocketShutdown.Both); + } + + public bool SendMessageReliable(MessageBuffer message) + { + byte[] header = new byte[4]; + BigEndianHelper.WriteBytes(header, 0, message.Count); + + SocketAsyncEventArgs args = ObjectCache.GetSocketAsyncEventArgs(); + args.SocketError = SocketError.Success; + + args.SetBuffer(null, 0, 0); + args.BufferList = new List>() + { + new ArraySegment(header), + new ArraySegment(message.Buffer, message.Offset, message.Count) + }; + args.UserToken = message; + + try + { + Socket.Send(args.BufferList); + } + catch (SocketException ex) + { + args.SocketError = ex.SocketErrorCode; + } + catch (Exception) + { + return false; + } + + SendCompleted(args); + + return true; + } + + /// + /// Receives TCP header followed by a TCP body. The operation + /// may exit early in an incomplete state. + /// + public void PollReceiveHeaderAndBodyNonBlocking() + { + var args = tcpArgs; + + while (true) + { + if (tcpReceiveState == TcpReceiveState.ReceiveHeader) + { + if (!PollReceiveTcpNonBlocking(args)) + return; + + int bodyLength = ProcessHeader(args); + SetupReceiveBody(args, bodyLength); + } + + if (tcpReceiveState == TcpReceiveState.ReceiveBody) + { + if (!PollReceiveTcpNonBlocking(args)) + return; + + try + { + MessageBuffer bodyBuffer = ProcessBody(args); + ProcessMessage(bodyBuffer); + } + finally + { + SetupReceiveHeader(tcpArgs); + } + } + } + } + + private bool IsTcpReceiveComplete(SocketAsyncEventArgs args) + { + if (tcpBytesTransferred == 0) + return false; + + MessageBuffer buffer = (MessageBuffer)args.UserToken; + + return args.Offset + tcpBytesTransferred - buffer.Offset >= buffer.Count; + } + + private bool PollReceiveTcpNonBlocking(SocketAsyncEventArgs args) + { + while (!IsTcpReceiveComplete(args)) + { + UpdateBufferPointers(args); + + int bytesAvailable = Socket.Available; + + if (bytesAvailable == 0) + return false; + + try + { + args.SocketError = SocketError.Success; + tcpBytesTransferred = Socket.Receive(args.Buffer, args.Offset, Math.Min(bytesAvailable, args.Count), SocketFlags.None); + } + catch (ObjectDisposedException) + { + HandleDisconnectionDuringTcpReceive(args); + return false; + } + catch (SocketException ex) + { + if (ex.SocketErrorCode == SocketError.WouldBlock) + return false; + + args.SocketError = ex.SocketErrorCode; + HandleDisconnectionDuringTcpReceive(args); + return false; + } + + if (tcpBytesTransferred == 0) + return false; + } + + return true; + } + + /// + /// Processes a TCP header received. + /// + /// The socket args used during the operation. + /// The number of bytes in the body. + private int ProcessHeader(SocketAsyncEventArgs args) + { + MessageBuffer headerBuffer = (MessageBuffer)args.UserToken; + + int bodyLength = BigEndianHelper.ReadInt32(headerBuffer.Buffer, headerBuffer.Offset); + + headerBuffer.Dispose(); + + return bodyLength; + } + + /// + /// Processes a TCP body received. + /// + /// The socket args used during the operation. + /// The buffer received. + private MessageBuffer ProcessBody(SocketAsyncEventArgs args) + { + return (MessageBuffer)args.UserToken; + } + + /// + /// Invokes message recevied events and cleans up. + /// + /// The TCP body received. + private void ProcessMessage(MessageBuffer buffer) + { + handleMessage(buffer, SendMode.Reliable); + + buffer.Dispose(); + } + + /// + /// Handles a disconnection while receiving a TCP header. + /// + /// The socket args used during the operation. + private void HandleDisconnectionDuringTcpReceive(SocketAsyncEventArgs args) + { + disconnect(args.SocketError); + } + + /// + /// Setup a listen operation for a new TCP header. + /// + /// The socket args to use during the operation. + private void SetupReceiveHeader(SocketAsyncEventArgs args) + { + tcpBytesTransferred = 0; + tcpReceiveState = TcpReceiveState.ReceiveHeader; + + MessageBuffer headerBuffer = MessageBuffer.Create(4); + + args.SetBuffer(headerBuffer.Buffer, headerBuffer.Offset, 4); + args.UserToken = headerBuffer; + } + + /// + /// Setup a listen operation for a new TCP body. + /// + /// The socket args to use during the operation. + /// The number of bytes in the body. + private void SetupReceiveBody(SocketAsyncEventArgs args, int length) + { + tcpBytesTransferred = 0; + tcpReceiveState = TcpReceiveState.ReceiveBody; + + MessageBuffer bodyBuffer = MessageBuffer.Create(length); + bodyBuffer.Count = length; + + args.SetBuffer(bodyBuffer.Buffer, bodyBuffer.Offset, length); + args.UserToken = bodyBuffer; + } + + /// + /// Updates the pointers on the buffer to continue a receive operation. + /// + /// The socket args to update. + private void UpdateBufferPointers(SocketAsyncEventArgs args) + { + args.SetBuffer(args.Offset + tcpBytesTransferred, args.Count - tcpBytesTransferred); + } + + /// + /// Called when a TCP or UDP send has completed. + /// + /// + private void SendCompleted(SocketAsyncEventArgs e) + { + if (e.SocketError != SocketError.Success) + disconnect(e.SocketError); + + //Always dispose buffer when completed! + ((MessageBuffer)e.UserToken).Dispose(); + + ObjectCache.ReturnSocketAsyncEventArgs(e); + } + + protected virtual void Dispose(bool disposing) + { + if (!disposedValue) + { + if (disposing) + { + Socket.Close(); + + if (tcpArgs != null) + { + MessageBuffer buffer = (MessageBuffer)tcpArgs.UserToken; + buffer.Dispose(); + + ObjectCache.ReturnSocketAsyncEventArgs(tcpArgs); + tcpArgs = null; + } + } + + // TODO: free unmanaged resources (unmanaged objects) and override finalizer + // TODO: set large fields to null + disposedValue = true; + } + } + + // // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources + // ~SynchronousTcpSocket() + // { + // // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + // Dispose(disposing: false); + // } + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } +} diff --git a/DarkRift/SynchronousUdpSocket.cs b/DarkRift/SynchronousUdpSocket.cs new file mode 100644 index 0000000..33cb440 --- /dev/null +++ b/DarkRift/SynchronousUdpSocket.cs @@ -0,0 +1,152 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Linq; +using System.Net.Sockets; +using System.Text; + +namespace DarkRift +{ + internal class SynchronousUdpSocket : IDisposable + { + public Socket Socket { get; private set; } + + private readonly Action disconnect; + private readonly Action handleMessage; + + private SocketAsyncEventArgs udpArgs; + private bool disposedValue; + + public SynchronousUdpSocket(Socket socket, Action disconnect, Action handleMessage) + { + Socket = socket; + this.disconnect = disconnect; + this.handleMessage = handleMessage; + + //TODO: be able to call ResetBuffers() + } + + public void ResetBuffers() + { + udpArgs = ObjectCache.GetSocketAsyncEventArgs(); + udpArgs.BufferList = null; + udpArgs.SetBuffer(new byte[ushort.MaxValue], 0, ushort.MaxValue); + } + + public bool SendMessageUnreliable(MessageBuffer message) + { + SocketAsyncEventArgs args = ObjectCache.GetSocketAsyncEventArgs(); + args.SocketError = SocketError.Success; + args.BufferList = null; + args.SetBuffer(message.Buffer, message.Offset, message.Count); + args.UserToken = message; + + try + { + Socket.Send(message.Buffer, message.Offset, message.Count, SocketFlags.None); + } + catch (Exception) + { + return false; + } + + SendCompleted(args); + + return true; + } + + /// + /// Called when a UDP message arrives. + /// + public void PollReceiveBodyNonBlocking() + { + var args = udpArgs; + + while (true) + { + int bytesAvailable = Socket.Available; + if (bytesAvailable == 0) + return; + + int bytesTransferred; + try + { + args.SocketError = SocketError.Success; + bytesTransferred = Socket.Receive(args.Buffer, ushort.MaxValue, SocketFlags.None); + } + catch (SocketException ex) + { + if (ex.SocketErrorCode == SocketError.ConnectionReset) + { + //Ignore ConnectionReset (ICMP Port Unreachable) since NATs will return that when they get + //the punchthrough packets and they've not already been opened + return; + } + + args.SocketError = ex.SocketErrorCode; + disconnect(args.SocketError); + return; + } + + using (MessageBuffer buffer = MessageBuffer.Create(bytesTransferred)) + { + Buffer.BlockCopy(args.Buffer, 0, buffer.Buffer, buffer.Offset, bytesTransferred); + buffer.Count = bytesTransferred; + + //Length of 0 must be a hole punching packet + if (buffer.Count != 0) + handleMessage(buffer, SendMode.Unreliable); + } + } + } + + /// + /// Called when a TCP or UDP send has completed. + /// + /// + private void SendCompleted(SocketAsyncEventArgs e) + { + if (e.SocketError != SocketError.Success) + disconnect(e.SocketError); + + //Always dispose buffer when completed! + ((MessageBuffer)e.UserToken).Dispose(); + + ObjectCache.ReturnSocketAsyncEventArgs(e); + } + + protected virtual void Dispose(bool disposing) + { + if (!disposedValue) + { + if (disposing) + { + if (udpArgs != null) + { + udpArgs.SetBuffer(null, 0, 0); + ObjectCache.ReturnSocketAsyncEventArgs(udpArgs); + udpArgs = null; + } + } + + // TODO: free unmanaged resources (unmanaged objects) and override finalizer + // TODO: set large fields to null + disposedValue = true; + } + } + + // // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources + // ~SynchronousUdpSocket() + // { + // // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + // Dispose(disposing: false); + // } + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } +} From 963edafa58a5a4e59e54f896c4a1cab4a87cbe45 Mon Sep 17 00:00:00 2001 From: "petter.hansson" Date: Sat, 29 Oct 2022 13:30:25 +0200 Subject: [PATCH 6/7] Ported BichannelServerConnection to synchronous sockets. --- DarkRift.Client/BichannelClientConnection.cs | 14 +- .../Bichannel/BichannelServerConnection.cs | 503 ++++-------------- .../{WorkerThread.cs => PollingThread.cs} | 51 +- DarkRift/SynchronousTcpSocket.cs | 18 +- 4 files changed, 158 insertions(+), 428 deletions(-) rename DarkRift/Dispatching/{WorkerThread.cs => PollingThread.cs} (66%) diff --git a/DarkRift.Client/BichannelClientConnection.cs b/DarkRift.Client/BichannelClientConnection.cs index f7ef066..bf7b8e3 100644 --- a/DarkRift.Client/BichannelClientConnection.cs +++ b/DarkRift.Client/BichannelClientConnection.cs @@ -197,14 +197,15 @@ public override void Connect() throw; } + //tcp.Socket.Blocking = false; + //tcp.CheckAvailable = false; + tcp.ResetBuffers(); udp.ResetBuffers(); //Mark connected to allow sending connectionState = ConnectionState.Connected; - - //Calling synchronously in game loop would probably be better as it keeps messages in socket buffer instead. - + PollingThread.AddWork(DoPolling); } @@ -252,7 +253,10 @@ public override IPEndPoint GetRemoteEndPoint(string name) throw new ArgumentException("Endpoint name must either be TCP or UDP"); } - private void DoPolling() + /// + /// Explicitly performs a step of message polling. + /// + public void DoPolling() { tcp.PollReceiveHeaderAndBodyNonBlocking(); udp.PollReceiveBodyNonBlocking(); @@ -291,6 +295,8 @@ protected override void Dispose(bool disposing) { Disconnect(); + PollingThread.RemoveWork(DoPolling); + tcp.Socket.Close(); udp.Socket.Close(); diff --git a/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelServerConnection.cs b/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelServerConnection.cs index 6eeb87a..81129c9 100644 --- a/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelServerConnection.cs +++ b/DarkRift.Server/Plugins/Listeners/Bichannel/BichannelServerConnection.cs @@ -4,9 +4,11 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ +using DarkRift.Dispatching; using DarkRift.Server.Metrics; using System; using System.Collections.Generic; +using System.Data; using System.Linq; using System.Net; using System.Net.Sockets; @@ -22,12 +24,26 @@ internal sealed class BichannelServerConnection : NetworkServerConnection /// /// Is this client able to send or not. /// - public bool CanSend { get; private set; } + public bool CanSend + { + get + { + lock (myLock) + return lockedCanSend; + } + } /// /// Is this client currently listening for messages or not. /// - public bool IsListening { get; private set; } + public bool IsListening + { + get + { + lock (myLock) + return lockedIsListening; + } + } /// /// The end point of the remote client on TCP. @@ -43,8 +59,8 @@ internal sealed class BichannelServerConnection : NetworkServerConnection /// Whether Nagel's algorithm should be disabled or not. /// public bool NoDelay { - get => tcpSocket.NoDelay; - set => tcpSocket.NoDelay = value; + get => tcp.Socket.NoDelay; + set => tcp.Socket.NoDelay = value; } /// @@ -58,11 +74,6 @@ public bool NoDelay { /// public override IEnumerable RemoteEndPoints => new IPEndPoint[2] { RemoteTcpEndPoint, RemoteUdpEndPoint }; - /// - /// The socket used in TCP communication. - /// - private readonly Socket tcpSocket; - /// /// The listener used in UDP communication. /// @@ -88,16 +99,22 @@ public bool NoDelay { /// private readonly ICounterMetric bytesReceivedCounterUdp; + private bool lockedIsListening; + private bool lockedCanSend; + private readonly object myLock = new object(); + + private readonly SynchronousTcpSocket tcp; + internal BichannelServerConnection(Socket tcpSocket, BichannelListenerBase networkListener, IPEndPoint udpEndPoint, long authToken, MetricsCollector metricsCollector) { - this.tcpSocket = tcpSocket; + this.tcp = new SynchronousTcpSocket(tcpSocket, UnregisterAndDisconnect, HandleTcpMessage); this.networkListener = networkListener; this.RemoteTcpEndPoint = (IPEndPoint)tcpSocket.RemoteEndPoint; this.RemoteUdpEndPoint = udpEndPoint; this.AuthToken = authToken; //Mark connected to allow sending - CanSend = true; + lockedCanSend = true; TaggedMetricBuilder bytesSentCounter = metricsCollector.Counter("bytes_sent", "The number of bytes sent to clients by the listener.", "protocol"); TaggedMetricBuilder bytesReceivedCounter = metricsCollector.Counter("bytes_received", "The number of bytes received from clients by the listener.", "protocol"); @@ -105,6 +122,9 @@ internal BichannelServerConnection(Socket tcpSocket, BichannelListenerBase netwo bytesSentCounterUdp = bytesSentCounter.WithTags("udp"); bytesReceivedCounterTcp = bytesReceivedCounter.WithTags("tcp"); bytesReceivedCounterUdp = bytesReceivedCounter.WithTags("udp"); + + tcp.CheckBodyLength = CheckTcpBodyLength; + tcp.OnSendCompleted = TcpSendCompleted; } /// @@ -112,21 +132,19 @@ internal BichannelServerConnection(Socket tcpSocket, BichannelListenerBase netwo /// public override void StartListening() { - //Setup the TCP socket to receive a header - SocketAsyncEventArgs tcpArgs = ObjectCache.GetSocketAsyncEventArgs(); - tcpArgs.BufferList = null; + //tcp.Socket.Blocking = false; + //tcp.CheckAvailable = false; - SetupReceiveHeader(tcpArgs); - // TODO can throw an object disposed exception here if we connect and disconnect very quickly - bool headerCompletingAsync = tcpSocket.ReceiveAsync(tcpArgs); - if (!headerCompletingAsync) - AsyncReceiveHeaderCompleted(this, tcpArgs); + tcp.ResetBuffers(); //Register for UDP Messages networkListener.RegisterUdpConnection(this); //Mark as listening - IsListening = true; + lock (myLock) + lockedIsListening = true; + + PollingThread.AddWork(DoPolling); } /// @@ -135,38 +153,7 @@ public override bool SendMessageReliable(MessageBuffer message) if (!CanSend) return false; - byte[] header = new byte[4]; // TODO pool! - BigEndianHelper.WriteBytes(header, 0, message.Count); - - SocketAsyncEventArgs args = ObjectCache.GetSocketAsyncEventArgs(); - args.SocketError = SocketError.Success; - - args.SetBuffer(null, 0, 0); - args.BufferList = new List>() // TODO pooollllllll! - { - new ArraySegment(header), - new ArraySegment(message.Buffer, message.Offset, message.Count) - }; - args.UserToken = message; - - args.Completed += TcpSendCompleted; - - - bool completingAsync; - try - { - tcpSocket.Send(args.BufferList); - completingAsync = false; - } - catch (Exception) - { - return false; - } - - if (!completingAsync) - TcpSendCompleted(this, args); - - return true; + return tcp.SendMessageReliable(message); } /// @@ -184,12 +171,17 @@ public override bool SendMessageUnreliable(MessageBuffer message) /// Whether the disconnect was successful. public override bool Disconnect() { - if (!CanSend && !IsListening) - return false; + lock (myLock) + { + if (!lockedCanSend && !lockedIsListening) + return false; + } + + PollingThread.RemoveWork(DoPolling); try { - tcpSocket.Shutdown(SocketShutdown.Both); + tcp.Shutdown(); } catch (SocketException) { @@ -198,333 +190,20 @@ public override bool Disconnect() networkListener.UnregisterUdpConnection(this); - CanSend = false; - IsListening = false; - - return true; - } - - /// - /// Receives TCP header followed by a TCP body, looping until the operation becomes asynchronous. - /// - /// The socket args to use during the operations. - private void ReceiveHeaderAndBody(SocketAsyncEventArgs args) - { - while (true) - { - if (!WasHeaderReceiveSucessful(args)) - { - HandleDisconnectionDuringHeaderReceive(args); - return; - } - - if (!IsHeaderReceiveComplete(args)) - { - UpdateBufferPointers(args); - - try - { - bool headerContinueCompletingAsync = tcpSocket.ReceiveAsync(args); - if (headerContinueCompletingAsync) - return; - } - catch (ObjectDisposedException) - { - HandleDisconnectionDuringHeaderReceive(args); - return; - } - - continue; - } - - int bodyLength = ProcessHeader(args); - if (bodyLength >= networkListener.MaxTcpBodyLength) - { - Strike("TCP body length was above allowed limits.", 10); - return; - } - - SetupReceiveBody(args, bodyLength); - while (true) - { - try - { - bool bodyCompletingAsync = tcpSocket.ReceiveAsync(args); - if (bodyCompletingAsync) - return; - } - catch (ObjectDisposedException) - { - HandleDisconnectionDuringBodyReceive(args); - return; - } - - if (!WasBodyReceiveSucessful(args)) - { - HandleDisconnectionDuringBodyReceive(args); - return; - } - - if (IsBodyReceiveComplete(args)) - break; - - UpdateBufferPointers(args); - } - - MessageBuffer bodyBuffer = ProcessBody(args); - - // Start next receive before invoking events - SetupReceiveHeader(args); - bool headerCompletingAsync; - try - { - headerCompletingAsync = tcpSocket.ReceiveAsync(args); - } - catch (ObjectDisposedException) - { - HandleDisconnectionDuringHeaderReceive(args); - return; - } - - ProcessMessage(bodyBuffer); - - if (headerCompletingAsync) - return; - } - } - - /// - /// Event handler for when a TCP header has been received. - /// - /// The invoking object. - /// The socket args used during the operation. - private void AsyncReceiveHeaderCompleted(object sender, SocketAsyncEventArgs args) - { - //We can move straight back into main loop - ReceiveHeaderAndBody(args); - } - - /// - /// Event handler for when a TCP body has been received. - /// - /// The invoking object. - /// The socket args used during the operation. - private void AsyncReceiveBodyCompleted(object sender, SocketAsyncEventArgs args) - { - while (true) + lock (myLock) { - if (!WasBodyReceiveSucessful(args)) - { - HandleDisconnectionDuringBodyReceive(args); - return; - } - - if (IsBodyReceiveComplete(args)) - break; - - UpdateBufferPointers(args); - - try - { - bool bodyContinueCompletingAsync = tcpSocket.ReceiveAsync(args); - if (bodyContinueCompletingAsync) - return; - } - catch (ObjectDisposedException) - { - HandleDisconnectionDuringBodyReceive(args); - return; - } + lockedCanSend = false; + lockedIsListening = false; } - MessageBuffer bodyBuffer = ProcessBody(args); - - // Start next receive before invoking events - SetupReceiveHeader(args); - bool headerCompletingAsync; - try - { - headerCompletingAsync = tcpSocket.ReceiveAsync(args); - } - catch (ObjectDisposedException) - { - HandleDisconnectionDuringHeaderReceive(args); - return; - } - - ProcessMessage(bodyBuffer); - - if (headerCompletingAsync) - return; - - //Now move back into main loop until no more data is present - ReceiveHeaderAndBody(args); - } - - /// - /// Checks if a TCP header was received in its entirety. - /// - /// The socket args used during the operation. - /// If the whole header has been received. - private bool IsHeaderReceiveComplete(SocketAsyncEventArgs args) - { - MessageBuffer headerBuffer = (MessageBuffer)args.UserToken; - - return args.Offset + args.BytesTransferred - headerBuffer.Offset >= headerBuffer.Count; - } - - /// - /// Checks if a TCP body was received in its entirety. - /// - /// The socket args used during the operation. - /// If the whole body has been received. - private bool IsBodyReceiveComplete(SocketAsyncEventArgs args) - { - MessageBuffer bodyBuffer = (MessageBuffer)args.UserToken; - - return args.Offset + args.BytesTransferred - bodyBuffer.Offset >= bodyBuffer.Count; - } - - /// - /// Processes a TCP header received. - /// - /// The socket args used during the operation. - /// The number of bytes in the body. - private int ProcessHeader(SocketAsyncEventArgs args) - { - MessageBuffer headerBuffer = (MessageBuffer)args.UserToken; - - int bodyLength = BigEndianHelper.ReadInt32(headerBuffer.Buffer, headerBuffer.Offset); - - headerBuffer.Dispose(); - - args.Completed -= AsyncReceiveHeaderCompleted; - - return bodyLength; - } - - /// - /// Processes a TCP body received. - /// - /// The socket args used during the operation. - /// The buffer received. - private MessageBuffer ProcessBody(SocketAsyncEventArgs args) - { - args.Completed -= AsyncReceiveBodyCompleted; - - return (MessageBuffer)args.UserToken; - } - - /// - /// Invokes message recevied events and cleans up. - /// - /// The TCP body received. - private void ProcessMessage(MessageBuffer buffer) - { - HandleMessageReceived(buffer, SendMode.Reliable); - - int bytesReceived = buffer.Count; - buffer.Dispose(); - - bytesReceivedCounterTcp.Increment(bytesReceived + 4); - } - - /// - /// Checks if a TCP header was received correctly. - /// - /// The socket args used during the operation. - /// If the receive completed correctly. - private bool WasHeaderReceiveSucessful(SocketAsyncEventArgs args) - { - return args.BytesTransferred != 0 && args.SocketError == SocketError.Success; - } - - /// - /// Checks if a TCP body was received correctly. - /// - /// The socket args used during the operation. - /// If the receive completed correctly. - private bool WasBodyReceiveSucessful(SocketAsyncEventArgs args) - { - return args.BytesTransferred != 0 && args.SocketError == SocketError.Success; - } - - /// - /// Handles a disconnection while receiving a TCP header. - /// - /// The socket args used during the operation. - private void HandleDisconnectionDuringHeaderReceive(SocketAsyncEventArgs args) - { - try - { - UnregisterAndDisconnect(args.SocketError); - } - finally - { - MessageBuffer buffer = (MessageBuffer)args.UserToken; - buffer.Dispose(); - - args.Completed -= AsyncReceiveHeaderCompleted; - ObjectCache.ReturnSocketAsyncEventArgs(args); - } - } - - /// - /// Handles a disconnection while receiving a TCP body. - /// - /// The socket args used during the operation. - private void HandleDisconnectionDuringBodyReceive(SocketAsyncEventArgs args) - { - try - { - UnregisterAndDisconnect(args.SocketError); - } - finally - { - MessageBuffer buffer = (MessageBuffer)args.UserToken; - buffer.Dispose(); - - args.Completed -= AsyncReceiveBodyCompleted; - ObjectCache.ReturnSocketAsyncEventArgs(args); - } - } - - /// - /// Setup a listen operation for a new TCP header. - /// - /// The socket args to use during the operation. - private void SetupReceiveHeader(SocketAsyncEventArgs args) - { - MessageBuffer headerBuffer = MessageBuffer.Create(4); - headerBuffer.Count = 4; - - args.SetBuffer(headerBuffer.Buffer, headerBuffer.Offset, 4); - args.UserToken = headerBuffer; - args.Completed += AsyncReceiveHeaderCompleted; + return true; } - /// - /// Setup a listen operation for a new TCP body. - /// - /// The socket args to use during the operation. - /// The number of bytes in the body. - private void SetupReceiveBody(SocketAsyncEventArgs args, int length) + private void HandleTcpMessage(MessageBuffer buffer, SendMode sendMode) { - MessageBuffer bodyBuffer = MessageBuffer.Create(length); - bodyBuffer.Count = length; + this.HandleMessageReceived(buffer, sendMode); - args.SetBuffer(bodyBuffer.Buffer, bodyBuffer.Offset, length); - args.UserToken = bodyBuffer; - args.Completed += AsyncReceiveBodyCompleted; - } - - /// - /// Updates the pointers on the buffer to continue a receive operation. - /// - /// The socket args to update. - private void UpdateBufferPointers(SocketAsyncEventArgs args) - { - args.SetBuffer(args.Offset + args.BytesTransferred, args.Count - args.BytesTransferred); + bytesReceivedCounterTcp.Increment(buffer.Count + 4); } /// @@ -537,29 +216,6 @@ internal void HandleUdpMessage(MessageBuffer buffer) bytesReceivedCounterUdp.Increment(buffer.Count); } - /// - /// Called when a TCP send has completed. - /// - /// - /// - private void TcpSendCompleted(object sender, SocketAsyncEventArgs e) - { - if (e.SocketError != SocketError.Success) - UnregisterAndDisconnect(e.SocketError); - - e.Completed -= TcpSendCompleted; - - MessageBuffer messageBuffer = (MessageBuffer)e.UserToken; - int bytesSent = messageBuffer.Count; - - //Always dispose buffer when completed! - messageBuffer.Dispose(); - - ObjectCache.ReturnSocketAsyncEventArgs(e); - - bytesSentCounterTcp.Increment(bytesSent + 4); - } - /// /// Called when a UDP send has completed. /// @@ -573,18 +229,43 @@ private void UdpSendCompleted(int bytesSent, SocketError e) bytesSentCounterUdp.Increment(bytesSent); } + private void TcpSendCompleted(MessageBuffer messageBuffer) + { + bytesSentCounterTcp.Increment(messageBuffer.Count + 4); + } + + private bool CheckTcpBodyLength(int bodyLength) + { + if (bodyLength >= networkListener.MaxTcpBodyLength) + { + Strike("TCP body length was above allowed limits.", 10); + return false; + } + + return true; + } + /// /// Called when a socket error has occured. /// /// private void UnregisterAndDisconnect(SocketError error) { - if (CanSend || IsListening) + bool canUnregister = false; + lock (myLock) + { + canUnregister = lockedCanSend || lockedIsListening; + } + + if (canUnregister) { networkListener.UnregisterUdpConnection(this); - CanSend = false; - IsListening = false; + lock (myLock) + { + lockedCanSend = false; + lockedIsListening = false; + } HandleDisconnection(error); } @@ -601,6 +282,14 @@ public override IPEndPoint GetRemoteEndPoint(string name) throw new ArgumentException("Endpoint name must either be TCP or UDP"); } + /// + /// Explicitly performs a step of message polling. + /// + public void DoPolling() + { + tcp.PollReceiveHeaderAndBodyNonBlocking(); + } + #region IDisposable Support private bool disposedValue = false; // To detect redundant calls @@ -610,10 +299,18 @@ protected override void Dispose(bool disposing) { if (disposing) { - if (IsListening || CanSend) + bool shouldDisconnect = false; + lock (myLock) + { + shouldDisconnect = lockedIsListening || lockedCanSend; + } + + if (shouldDisconnect) Disconnect(); - tcpSocket.Close(); + PollingThread.RemoveWork(DoPolling); + + tcp.Dispose(); } disposedValue = true; diff --git a/DarkRift/Dispatching/WorkerThread.cs b/DarkRift/Dispatching/PollingThread.cs similarity index 66% rename from DarkRift/Dispatching/WorkerThread.cs rename to DarkRift/Dispatching/PollingThread.cs index 66167a4..c62c45b 100644 --- a/DarkRift/Dispatching/WorkerThread.cs +++ b/DarkRift/Dispatching/PollingThread.cs @@ -11,7 +11,6 @@ internal static class PollingThread private static readonly object myLock = new object(); private static bool threadStarted; private static readonly List workList = new List(); - private static Action[] threadsafeWorkList = new Action[0]; private static readonly ManualResetEvent stopEvent = new ManualResetEvent(false); @@ -20,7 +19,6 @@ public static void AddWork(Action work) lock (myLock) { workList.Add(work); - threadsafeWorkList = workList.ToArray(); if (!threadStarted) { @@ -31,13 +29,18 @@ public static void AddWork(Action work) public static void RemoveWork(Action work) { + //this blocking on a polling iteration is intended lock (myLock) { - workList.Remove(work); - threadsafeWorkList = workList.ToArray(); + InternalRemoveWork(work); } } + private static void InternalRemoveWork(Action work) + { + workList.Remove(work); + } + public static void StopThread() { stopEvent.Set(); @@ -59,30 +62,40 @@ private static void StartThread() private static void PollingThreadLogic() { var rng = new Random(); - + while (!stopEvent.WaitOne(1)) { - Action[] work = threadsafeWorkList; - rng.Shuffle(work); + lock (myLock) + { + PollingIteration(rng); + } + } + } + + private static void PollingIteration(Random rng) + { + var work = workList; + rng.Shuffle(work); + + for (int i = work.Count - 1; i >= 0; --i) + { + var item = work[i]; - foreach (Action item in work) + try + { + item?.Invoke(); + } + catch (Exception ex) { - try - { - item?.Invoke(); - } - catch (Exception ex) - { - RemoveWork(item); - ExceptionHandler?.Invoke(ex); - } + InternalRemoveWork(item); + ExceptionHandler?.Invoke(ex); } } } - private static void Shuffle(this Random rng, T[] array) + private static void Shuffle(this Random rng, IList array) { - int n = array.Length; + int n = array.Count; while (n > 1) { int k = rng.Next(n--); diff --git a/DarkRift/SynchronousTcpSocket.cs b/DarkRift/SynchronousTcpSocket.cs index 2329e4b..41bd224 100644 --- a/DarkRift/SynchronousTcpSocket.cs +++ b/DarkRift/SynchronousTcpSocket.cs @@ -12,6 +12,8 @@ internal class SynchronousTcpSocket : IDisposable private readonly Action disconnect; private readonly Action handleMessage; + public Func CheckBodyLength { get; set; } + public Action OnSendCompleted { get; set; } private SocketAsyncEventArgs tcpArgs; private TcpReceiveState tcpReceiveState; @@ -97,6 +99,10 @@ public void PollReceiveHeaderAndBodyNonBlocking() return; int bodyLength = ProcessHeader(args); + if (CheckBodyLength != null && !CheckBodyLength(bodyLength)) + { + return; + } SetupReceiveBody(args, bodyLength); } @@ -128,6 +134,8 @@ private bool IsTcpReceiveComplete(SocketAsyncEventArgs args) return args.Offset + tcpBytesTransferred - buffer.Offset >= buffer.Count; } + public bool CheckAvailable { get; set; } = true; + private bool PollReceiveTcpNonBlocking(SocketAsyncEventArgs args) { while (!IsTcpReceiveComplete(args)) @@ -136,7 +144,7 @@ private bool PollReceiveTcpNonBlocking(SocketAsyncEventArgs args) int bytesAvailable = Socket.Available; - if (bytesAvailable == 0) + if (CheckAvailable && bytesAvailable == 0) return false; try @@ -160,7 +168,10 @@ private bool PollReceiveTcpNonBlocking(SocketAsyncEventArgs args) } if (tcpBytesTransferred == 0) + { + HandleDisconnectionDuringTcpReceive(args); return false; + } } return true; @@ -262,8 +273,11 @@ private void SendCompleted(SocketAsyncEventArgs e) if (e.SocketError != SocketError.Success) disconnect(e.SocketError); + MessageBuffer messageBuffer = (MessageBuffer)e.UserToken; + OnSendCompleted?.Invoke(messageBuffer); + //Always dispose buffer when completed! - ((MessageBuffer)e.UserToken).Dispose(); + messageBuffer.Dispose(); ObjectCache.ReturnSocketAsyncEventArgs(e); } From bef9de7519dac9e05073c6d6a7721f9cf5f6caf8 Mon Sep 17 00:00:00 2001 From: "petter.hansson" Date: Tue, 15 Nov 2022 16:48:28 +0100 Subject: [PATCH 7/7] ExplicitPolling setting in client, and DoPolling renamed to PollSockets. --- DarkRift.Client/BichannelClientConnection.cs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/DarkRift.Client/BichannelClientConnection.cs b/DarkRift.Client/BichannelClientConnection.cs index bf7b8e3..527d809 100644 --- a/DarkRift.Client/BichannelClientConnection.cs +++ b/DarkRift.Client/BichannelClientConnection.cs @@ -38,6 +38,11 @@ public bool NoDelay { set => tcp.Socket.NoDelay = value; } + /// + /// When true no background thread is used and polling must instead be conducted by user by calling PollSockets(). + /// + public bool ExplicitPolling { get; set; } + /// public override IEnumerable RemoteEndPoints => new IPEndPoint[] { RemoteTcpEndPoint, RemoteUdpEndPoint }; @@ -205,8 +210,9 @@ public override void Connect() //Mark connected to allow sending connectionState = ConnectionState.Connected; - - PollingThread.AddWork(DoPolling); + + if (!ExplicitPolling) + PollingThread.AddWork(PollSockets); } /// @@ -235,7 +241,7 @@ public override bool Disconnect() connectionState = ConnectionState.Disconnected; - PollingThread.RemoveWork(DoPolling); + PollingThread.RemoveWork(PollSockets); tcp.Shutdown(); @@ -256,7 +262,7 @@ public override IPEndPoint GetRemoteEndPoint(string name) /// /// Explicitly performs a step of message polling. /// - public void DoPolling() + public void PollSockets() { tcp.PollReceiveHeaderAndBodyNonBlocking(); udp.PollReceiveBodyNonBlocking(); @@ -272,7 +278,7 @@ private void Disconnect(SocketError error) { connectionState = ConnectionState.Disconnected; - PollingThread.RemoveWork(DoPolling); + PollingThread.RemoveWork(PollSockets); HandleDisconnection(error); } @@ -295,7 +301,7 @@ protected override void Dispose(bool disposing) { Disconnect(); - PollingThread.RemoveWork(DoPolling); + PollingThread.RemoveWork(PollSockets); tcp.Socket.Close(); udp.Socket.Close();