#region CPL License /* Nuclex Framework Copyright (C) 2002-2009 Nuclex Development Labs This library is free software; you can redistribute it and/or modify it under the terms of the IBM Common Public License as published by the IBM Corporation; either version 1.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the IBM Common Public License for more details. You should have received a copy of the IBM Common Public License along with this library */ #endregion using System; using System.Net.Sockets; using System.Threading; namespace Nuclex.Networking { // Idea: // Refactor some of the functionality into a separate class so besides this // socket receiver, an adapter can be built that allows cancellation of // asynchronous socket requests /// Asynchronously receives data from a socket /// /// This class simplifies the implementation of servers that need to receive /// unsolicited data from sockets asynchronously. It uses a lock-free /// receiving loop to ensure maximum performance for the receiver. /// public class SocketReceiver : IDisposable { /// Initializes a new connection from a HTTP client /// Socket of the connected client /// /// This object takes ownership of the socket and will close it upon being /// disposed. This cannot be avoided because the only way to stop a waiting /// receive request in the .NET framework is to close the receiving socket. /// public SocketReceiver(Socket socket) : this(socket, 256) { } /// Initializes a new connection from a HTTP client /// Socket of the connected client /// Size of the receive buffer /// /// This object takes ownership of the socket and will close it upon being /// disposed. This cannot be avoided because the only way to stop a waiting /// receive request in the .NET framework is to close the receiving socket. /// public SocketReceiver(Socket socket, int bufferSize) { this.dataReceivedDelegate = new AsyncCallback(this.dataReceived); this.socket = socket; this.buffer = new byte[bufferSize]; // Make sure the Socket and its receive buffer are written to memory. Since // the receive loop cannot be running at this time (no call to Start() yet), // we can be sure that it will (no matter which thread it runs in) either pull // all three fields from memory or run on the same CPU cache as the constructor // -- thus we can avoid declaring the above three fields as volatile! Thread.MemoryBarrier(); } /// Begins receiving data from the socket /// /// Normally, this method is to be called in the constructor of the deriving class /// to begin the asynchronous receive loop after you initialized any additional /// fields you are using in the OnDataReceived() callback. /// protected void Start() { lock(this) { // If a shutdown was requested (eg. Dispose() runs, possible in another // thread), do not start. This is checked before wasStarted is set because // otherwise, the Dispose() thread might see wasStarted being true and // try to wait for the end of a non-existing thread. if(Thread.VolatileRead(ref this.shutdownRequested) != 0) { return; } // If we were already started, do nothing. Multiple calls to Start() should // not happen, but are tolerated and should not result in multiple asynchronous // receive loop competing to get data from the socket. if(this.wasStarted) { System.Diagnostics.Trace.Assert( !this.wasStarted, "Start called on an already started socket receiver" ); return; } // Remember that we were started so Dispose() knows that it needs to // stop the receive loop and to catch multiple Start() calls. this.wasStarted = true; } // lock(this) // Begin processing data from the socket asynchronously. This is done in a // ThreadPool thread to eliminate the risk of the socket doing several synchronous // runs of returning data, thereby blocking the calling thread. ThreadPool.QueueUserWorkItem(new WaitCallback(this.startReceiving)); } /// Immediately stops the receive loop and releases all resources public void Dispose() { // Required to a) prevent races with threads calling the Start() method and // to b) avoid chaos when two threads try to call Dispose() at the same time. lock(this) { // If we're already disposed, do nothing if(Thread.VolatileRead(ref this.shutdownRequested) != 0) { return; } // If the asynchronous receive loop has already started, we need to shut // it down before we can safely release our resources if(this.wasStarted) { // The asynchronous receive loop will check the 'shutdownRequested' variable // without synchronization (in order to avoid locking in the receive loop). // However, when it detects the shutdownRequested flag, it will set this // event and shut down. Thus we need to create the event before setting the // shutdown flag! this.shutdownCompleteEvent = new ManualResetEvent(false); try { // Cache the socket in case this thread is preempted by the socket receiver // thread, setting the socket to null right after we have set the // shutdownRequested field Socket openSocket = this.socket; // Make the receive loop shut down. If it is currently waiting for incoming // data, the only way to get it out of this state is to close the socket. Thread.VolatileWrite(ref this.shutdownRequested, 1); openSocket.Close(); // Wait for the asynchronous receive loop to end this.shutdownCompleteEvent.WaitOne(); } finally { this.shutdownCompleteEvent.Close(); } } else { // Asynchronous receive loop was not started yet. In case we preempted the // Start() method directly at the top, wasStarted will never be set once the // thread that runs Start() enters the lock and sees the shutdown flag. Thread.VolatileWrite(ref this.shutdownRequested, 1); } // Helps the GC on Compact Framework builds this.socket = null; } // lock(this) } /// Shuts down the connection graciously /// /// Using this method is preferrable to simply disposing the SocketReceiver /// because it gives the other side a chance to be notified when the connection /// goes down and cleanly stops the transmission of data. /// protected void Shutdown() { lock(this) { // If we're already disposed, do nothing if(Thread.VolatileRead(ref this.shutdownRequested) != 0) { return; } // The socket can be disposed at any time, so make sure we're not running into // a race condition for the reference here and store it locally Socket socket = this.socket; if(socket == null) return; // If the connection still appears to be active, we'll attempt a gracious // disconnect before closing the socket for good. Of course, this leads to // a race condition (between our if and a possible connection shutdown // outside of our control) that we have to simply accept due to the design // of the .NET Socket library, thus the try..finally try { if(this.socket.Connected) { this.socket.Shutdown(SocketShutdown.Both); } } finally { // CHECK: OnConnectionDropped() may be called here // OnConnectionDropped() might be called when the other side closes // its connection before we enter the Dispose() method (i.e. _real_ fast). // Is this a problem for the user? Dispose(); } } // lock(this) } /// Called whenever data is received on the socket /// Buffer containing the received data /// Number of bytes that have been received protected virtual void OnDataReceived(byte[] buffer, int receivedByteCount) { } /// Called when the connection has been dropped by the peer protected virtual void OnConnectionDropped() { } /// Begins receiving data from the socket in the background /// Not used private void startReceiving(object state) { // Take a reference to the socket so we don't run race conditions with // Dispose() setting the socket to null and us accessing it. Socket socket = this.socket; if(ReferenceEquals(socket, null)) { safelyEndReceivingLoop(); return; } // Since the BeginReceive() call might complete synchronously, we may have to // repeat the call several times. If we just invoked this method again from the // dataReceived() callback, we would go deeper and deeper in the call stack, // possibly causing a StackOverflowException in extreme cases. for(; ; ) { // Stop the receive loop if the socket receiver was requested to shut down if(Thread.VolatileRead(ref this.shutdownRequested) != 0) { this.shutdownCompleteEvent.Set(); return; } IAsyncResult asyncResult; try { // Begin receiving data from the socket. This call can either result in a // synchronous invocation of the callback (if data was already in the socket's // internal buffer) or in an asynchronous invocation (if the socket has not // yet received any data). asyncResult = this.socket.BeginReceive( this.buffer, 0, this.buffer.Length, SocketFlags.None, this.dataReceivedDelegate, null ); } catch(ObjectDisposedException) { // This can happen during shutdown due to the design of the socket class // in the .NET framework. We try our best to avoid it, but there's no // guarantee that the socket isn't closed by another thread just when it // received some data. safelyEndReceivingLoop(); return; } // If the request is running asynchronously (meaning no data was in the // socket's internal buffer yet; see comment above), exit this thread. // The ThreadPool thread on which the socket called the dataReceived callback // will initiate next startReceiving() call as soon as data arrives. if(!asyncResult.CompletedSynchronously) { break; } } } /// Called when data has been received on the socket /// Handle of the asynchronous request private void dataReceived(IAsyncResult asyncResult) { #if DEBUG // In debug mode, give the thread a reasonable name so the user can clearly // see the purpose of threads in the debugger's thread view. Thread.CurrentThread.Name = "HttpClientConnection socket receiver"; #endif // If the socket is being closed by another thead, it will be set to null in // order to ensure it's no longer used to avoid multiple close attempts if // dispose is called more than once. To prevent the race condition that could // result, we take a copy of the socket and Socket socket = this.socket; if(ReferenceEquals(socket, null)) { safelyEndReceivingLoop(); return; } int receivedBytes; try { // If the socket has been closed, try to get out of here without causing // an exception. EndReceive() does not have to be called when the socket has // been disposed already (you generally shouldn't cease any communication with // disposed objects) if(!this.socket.Connected) { safelyEndReceivingLoop(); return; } // Now we've got a good chance that the socket has not been closed yet, so // try and hope for the best. The design of the socket class doesn't provide us // with a better way for the standard async pattern, so we have to accept and // eat up the ObjectDisposedException if it occurs. receivedBytes = this.socket.EndReceive(asyncResult); } catch(ObjectDisposedException) { // This can happen during shutdown due to the design of the socket class // in the .NET framework. We try our best to avoid it, but there's no // guarantee that the socket isn't closed by another thread just when it // received some data. safelyEndReceivingLoop(); return; } // If the we're being asked to shut down, do so without invoking the callbacks. // We will only call OnConnectionDropped() when _the_other_side_ closes the // connection. If the user called Dispose(), we don't want to invoke the callback! if(Thread.VolatileRead(ref this.shutdownRequested) != 0) { this.shutdownCompleteEvent.Set(); return; } // Receiving 0 bytes indicates that the connection has been dropped by the peer // using the graceful connection shutdown procedure, so in that case, we're done if(receivedBytes == 0) { safelyEndReceivingLoop(); OnConnectionDropped(); return; } // Finally, we can process the data we received on the socket OnDataReceived(this.buffer, receivedBytes); // Resume listening for incoming data. This is done after the processReceivedData() // intentionally - the idea is the processReceivedData() should not throw. If it // does, it obviously coughed on the data received and we don't know if it processed // part of it, all of it, and whether it will cough again, so what's the use on // giving it the next slice of data, cut at an arbitrary point? if(!asyncResult.CompletedSynchronously) { startReceiving(null); } } /// Ends the receiving loop in a safe manner /// /// Called when the asynchronous receiving loop is terminated unexpectedly. This /// happens when the other side closes the connection or a socket error occurs. /// On the remote chance that someone is running the Dispose() method just now, /// we have to guarantee that Dispose() won't sit there, forever waiting for its /// shutdownCompleteEvent() to become set. /// private void safelyEndReceivingLoop() { int wasShuttingDown = Interlocked.Exchange(ref this.shutdownRequested, 1); if(wasShuttingDown != 0) { this.shutdownCompleteEvent.Set(); } else { this.socket.Close(); this.socket = null; } } /// Delegate for the DataReceived callback method private AsyncCallback dataReceivedDelegate; /// Socket of the connected client private Socket socket; /// Buffer into which received data is written by the socket private byte[] buffer; /// Whether the Start() method has been called private bool wasStarted; /// True if the receive loop should stop private int shutdownRequested; /// Used by Dispose() to wait for the receive loop to end private volatile ManualResetEvent shutdownCompleteEvent; } } // namespace Nuclex.Networking