From a0bf61f32118c07ff63af389be17aa24ab2103ad Mon Sep 17 00:00:00 2001 From: Kevin Knoop Date: Tue, 16 Sep 2014 12:34:55 +0200 Subject: [PATCH] Implemented Dispose pattern for both Buffer and BufferEvent with no finalizers. The user should ensure Disposed is called when no longer needed on the main EV Loop thread. Removed redundant IntPtr allocation in Buffer.cs. BufferEvent delegates now saved as private fields to prevent the GC collecting these. Otherwise the native code calls back to dangling pointers. Added bufferevent_set_timeouts. Fixed ConnectionListener not providing correct IPEndPoint for new connections. Tested OK on Ubuntu. Added EventSyncContext to provide a means for running an EV Loop and for posting other work you want run on the same thread. --- Oars/Buffer.cs | 17 ++++++++++---- Oars/BufferEvent.cs | 47 ++++++++++++++++++++++++++++++++------ Oars/ConnectionListener.cs | 28 +++++++++++++++++------ Oars/Event.cs | 17 +++++++------- Oars/EventBase.cs | 3 ++- Oars/EventSyncContext.cs | 43 ++++++++++++++++++++++++++++++++++ Oars/Interop.cs | 35 +++++----------------------- 7 files changed, 134 insertions(+), 56 deletions(-) mode change 100755 => 100644 Oars/BufferEvent.cs create mode 100644 Oars/EventSyncContext.cs diff --git a/Oars/Buffer.cs b/Oars/Buffer.cs index ca79eeb..467c112 100755 --- a/Oars/Buffer.cs +++ b/Oars/Buffer.cs @@ -6,6 +6,7 @@ namespace Oars { public sealed class Buffer : IDisposable { + bool disposed; IntPtr handle; bool ownsBuffer; @@ -22,10 +23,20 @@ public Buffer(IntPtr handle) this.handle = handle; } + void Dispose(bool disposing) + { + if (!disposed) + { + disposed = true; + + if (ownsBuffer) + evbuffer_free(handle); + } + } + public void Dispose() { - if (ownsBuffer) - evbuffer_free(handle); + Dispose(true); } public bool Add(byte[] data, int offset, int count) @@ -41,8 +52,6 @@ public int Remove(byte[] data, int offset, int count) if (offset + count > data.Length) throw new Exception("offset + count > data.Length"); - var c = new IntPtr(count); - unsafe { fixed (byte *ptr = &data[0]) return evbuffer_remove(handle, ptr, count); diff --git a/Oars/BufferEvent.cs b/Oars/BufferEvent.cs old mode 100755 new mode 100644 index 13b6026..2f4b06a --- a/Oars/BufferEvent.cs +++ b/Oars/BufferEvent.cs @@ -22,6 +22,7 @@ internal BufferEventEventArgs(BufferEventEvents events) public sealed class BufferEvent : IDisposable { + bool disposed; public EventBase EventBase { get; private set; } IntPtr bev; @@ -32,8 +33,8 @@ public sealed class BufferEvent : IDisposable public event EventHandler Write; public event EventHandler Event; - public Buffer Input { get { return input ?? (input = new Buffer(bufferevent_get_input(bev))); } } - public Buffer Output { get { return output ?? (output = new Buffer(bufferevent_get_output(bev))); } } + public Buffer Input { get { if (disposed) throw new ObjectDisposedException("Input EVBuffer"); return input ?? (input = new Buffer(bufferevent_get_input(bev))); } } + public Buffer Output { get { if (disposed) throw new ObjectDisposedException("Ouput EVBuffer"); return output ?? (output = new Buffer(bufferevent_get_output(bev))); } } int readLow, readHigh = -1; @@ -54,23 +55,51 @@ void SetReadWatermark() bufferevent_setwatermark(bev, Events.EV_READ, new IntPtr(readLow), new IntPtr(readHigh)); } - public BufferEvent(EventBase eventBase, IntPtr socket) + bufferevent_data_cb readdel; + bufferevent_data_cb writedel; + bufferevent_event_cb eventdel; + + public BufferEvent(EventBase eventBase, IntPtr socket, int timeout) { var options = (int)(BufferEventOptions.CloseOnFree | BufferEventOptions.DeferCallbacks); bev = bufferevent_socket_new(eventBase.Handle, socket, options); + var t = timeval.FromTimeSpan(TimeSpan.FromMilliseconds(timeout)); + bufferevent_set_timeouts(bev, ref t, ref t); //Console.WriteLine("bufferevent_socket_new returned " + bev.ToInt32()); // none of these can throw exceptions. - var readCb = Marshal.GetFunctionPointerForDelegate(new bufferevent_data_cb(ReadCallbackInternal)); - var writeCb = Marshal.GetFunctionPointerForDelegate(new bufferevent_data_cb(WriteCallbackInternal)); - var eventCb = Marshal.GetFunctionPointerForDelegate(new bufferevent_event_cb(EventCallbackInternal)); + readdel = new bufferevent_data_cb(ReadCallbackInternal); + var readCb = Marshal.GetFunctionPointerForDelegate(readdel); + writedel = new bufferevent_data_cb(WriteCallbackInternal); + var writeCb = Marshal.GetFunctionPointerForDelegate(writedel); + eventdel = new bufferevent_event_cb(EventCallbackInternal); + var eventCb = Marshal.GetFunctionPointerForDelegate(eventdel); bufferevent_setcb(bev, readCb, writeCb, eventCb, IntPtr.Zero); } + void Dispose(bool disposing) + { + if (!disposed) + { + disposed = true; + if (disposing) + { + if (input != null) + input.Dispose(); + if (output != null) + output.Dispose(); + readdel = null; + writedel = null; + eventdel = null; + } + bufferevent_free(bev); + } + } + public void Dispose() { - bufferevent_free(bev); + Dispose(true); } public void Enable() @@ -125,6 +154,7 @@ void EventCallbackInternal(IntPtr bev, short what, IntPtr ctx) private delegate void bufferevent_data_cb(IntPtr bev, IntPtr ctx); private delegate void bufferevent_event_cb(IntPtr bev, short what, IntPtr ctx); + [Flags] enum BufferEventOptions { CloseOnFree = 1 << 0, @@ -132,6 +162,9 @@ enum BufferEventOptions DeferCallbacks = 1 << 2 } + [DllImport("event_core")] + static extern void bufferevent_set_timeouts(IntPtr bev, ref timeval timeoutread, ref timeval timeoutwrite); + [DllImport("event_core")] static extern IntPtr bufferevent_socket_new(IntPtr event_base, IntPtr socket, int options); diff --git a/Oars/ConnectionListener.cs b/Oars/ConnectionListener.cs index 37ca309..e29c78c 100755 --- a/Oars/ConnectionListener.cs +++ b/Oars/ConnectionListener.cs @@ -5,9 +5,21 @@ namespace Oars { + public sealed class ConnectionAcceptedEventArgs : EventArgs + { + public IntPtr Socket { get; private set; } + public IPEndPoint RemoteEndPoint { get; private set; } + + internal ConnectionAcceptedEventArgs(IntPtr socket, IPEndPoint remoteEndPoint) + { + Socket = socket; + RemoteEndPoint = remoteEndPoint; + } + } + public sealed class ConnectionListener : IDisposable { - public Action ConnectionAccepted; + public event EventHandler ConnectionAccepted; public EventBase Base { get; set; } public IPEndPoint ListenEndPoint { get; private set; } @@ -52,17 +64,19 @@ public void Disable() disabled = true; } - void ConnectionCallback(IntPtr listener, IntPtr socket, sockaddr_in address, int socklen, IntPtr ctx) + void ConnectionCallback(IntPtr listener, IntPtr socket, IntPtr address, int socklen, IntPtr ctx) { try { - if (ConnectionAccepted != null) - ConnectionAccepted(socket, address.ToIPEndPoint()); + if (ConnectionAccepted != null){ + sockaddr_in sockAddrIn = (sockaddr_in)Marshal.PtrToStructure(address, typeof(sockaddr_in)); + ConnectionAccepted(this, new ConnectionAcceptedEventArgs(socket, + new IPEndPoint(sockAddrIn.sin_addr.s_addr, (ushort)IPAddress.NetworkToHostOrder((short)sockAddrIn.sin_port)))); + } } - catch (Exception e) + catch (Exception) { Debug.WriteLine("Exception during connection listener callback."); - //Extensions.HandleException("EVConnListener callback", e); } } @@ -77,7 +91,7 @@ enum ConnectionListenerOptions } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - private delegate void evconnlistener_cb(IntPtr listener, IntPtr socket, sockaddr_in address, int socklen, IntPtr ctx); + private delegate void evconnlistener_cb(IntPtr listener, IntPtr socket, IntPtr address, int socklen, IntPtr ctx); [DllImport("event_core")] private unsafe static extern IntPtr evconnlistener_new_bind(IntPtr event_base, IntPtr cb, IntPtr ctx, short flags, short backlog, ref sockaddr_in sa, short socklen); diff --git a/Oars/Event.cs b/Oars/Event.cs index b97ccde..610e4ad 100755 --- a/Oars/Event.cs +++ b/Oars/Event.cs @@ -4,6 +4,7 @@ namespace Oars { + [Flags] public enum Events : short { None = 0, @@ -17,14 +18,14 @@ public enum Events : short public sealed class Event : IDisposable { - public IntPtr Socket { get; private set; } + IntPtr fd; + IntPtr fp; public IntPtr Handle { get; private set; } public Events Events { get; private set; } - public Action Activated; + public event EventHandler Activated; bool pending; Delegate cb; - IntPtr fp; public static Event CreateTimer(EventBase eventBase) { @@ -38,17 +39,17 @@ public static Event CreateTimer(EventBase eventBase, bool persist) public Event(EventBase eventBase, IntPtr fd, Events what) { - Socket = fd; + this.fd = fd; cb = Delegate.CreateDelegate(typeof(event_callback_fn), this, "EventCallbackInternal"); fp = Marshal.GetFunctionPointerForDelegate(cb); - Debug.Write("EVEvent created with fd " + fd.ToInt32().ToString("x") + ", cb " + fp.ToInt32().ToString("x")); + Debug.WriteLine("EVEvent created with fd " + fd.ToInt32().ToString("x") + ", cb " + fp.ToInt32().ToString("x")); Handle = event_new(eventBase.Handle, fd, (short)what, fp, IntPtr.Zero); } public void Dispose() { - Debug.Write("EVEvent disposed with fd " + Socket.ToInt32().ToString("x") + ", cb " + fp.ToInt32().ToString("x")); + Debug.WriteLine("EVEvent disposed with fd " + fd.ToInt32().ToString("x") + ", cb " + fp.ToInt32().ToString("x")); ThrowIfDisposed(); if (pending) @@ -95,9 +96,9 @@ void EventCallbackInternal(IntPtr fd, short what, IntPtr ctx) { Debug.WriteLine("Event on fd {0} activated with events {1}.", fd.ToInt32(), Events); if (Activated != null) - Activated(); + Activated(this, EventArgs.Empty); } - catch (Exception e) + catch (Exception) { Debug.WriteLine("Exception during event callback."); } diff --git a/Oars/EventBase.cs b/Oars/EventBase.cs index 96c7571..dd3afba 100755 --- a/Oars/EventBase.cs +++ b/Oars/EventBase.cs @@ -6,7 +6,8 @@ namespace Oars public enum LoopOptions : int { Once = 0x01, - NonBlock = 0x02 + NonBlock = 0x02, + NoExitOnEmpy = 0x04 } public sealed class EventBase : IDisposable diff --git a/Oars/EventSyncContext.cs b/Oars/EventSyncContext.cs new file mode 100644 index 0000000..3ca6b32 --- /dev/null +++ b/Oars/EventSyncContext.cs @@ -0,0 +1,43 @@ +using System.Threading; +using System.Collections.Concurrent; +using System.Collections.Generic; + +namespace Oars +{ + public sealed class EventSyncContext : SynchronizationContext + { + readonly BlockingCollection> m_queue = + new BlockingCollection>(); + + EventBase eventBase = new EventBase(); + + public EventBase EventBase + { + get + { + return eventBase; + } + } + + public override void Post(SendOrPostCallback d, object state) + { + m_queue.Add(new KeyValuePair(d, state)); + } + + public void RunOnCurrentThread() + { + KeyValuePair workItem; + while (true) + { + while (m_queue.TryTake(out workItem, 1)) + workItem.Key(workItem.Value); + eventBase.Loop(LoopOptions.NonBlock); + } + } + + public void Complete() { + m_queue.CompleteAdding(); + } + } +} + diff --git a/Oars/Interop.cs b/Oars/Interop.cs index 71fb8e2..fa62b7b 100755 --- a/Oars/Interop.cs +++ b/Oars/Interop.cs @@ -5,7 +5,7 @@ namespace Oars { - internal struct timeval + struct timeval { public int tv_sec; public int tv_usec; @@ -71,7 +71,7 @@ public IPEndPoint ToIPEndPoint() [StructLayout(LayoutKind.Sequential)] struct in_addr { - public long s_addr; + public uint s_addr; } static class OperatingSystem @@ -125,38 +125,15 @@ static bool IsRunningOnMac() } } - public static class FDExtensions + static class FDExtensions { + [DllImport("libc")] + static extern int close(IntPtr fd); + public static int Close(this IntPtr fd) { return close(fd); } - - public static int Recv(this IntPtr fd, ArraySegment buffer, int flags) - { - unsafe - { - fixed (byte* ptr = &(buffer.Array[buffer.Offset])) - return recv(fd, ptr, buffer.Count, flags); - } - } - - public static int Send(this IntPtr fd, ArraySegment buffer, int flags) - { - unsafe { - fixed (byte *ptr = &(buffer.Array[buffer.Offset])) - return send(fd, ptr, buffer.Count, flags); - } - } - - [DllImport("libc")] - static extern int close(IntPtr fd); - - [DllImport("libc")] - static unsafe extern int send(IntPtr fd, byte* buffer, int length, int flags); - - [DllImport("libc")] - static unsafe extern int recv(IntPtr fd, byte* buffer, int length, int flags); } // lifted this from Mono.Unix/Stdlib.cs