< C# & VB .NET Critical Section Examples | Main | C# Asynchronous I/O Program Example >

 


 

Chapter 3 Part 7:

Threading and the Asynchronous Pattern

 

 

What do we have in this chapter 3 Part 7?

  1. Asynchronous Pattern

  2. C++ Asynchronous I/O Program Example

 

 

Asynchronous Pattern

 

Now that you have an understanding of how to use threads to perform tasks asynchronously, we’ll present another way you can perform tasks asynchronously by using the asynchronous pattern that’s available in many of the classes in the .NET Framework. The asynchronous pattern is designed around methods that can potentially block when they perform their operation. Methods that block typically take a long time to perform a task such as writing data to a file or reading data from a network.

Classes that are designed to use the asynchronous pattern for methods that block have a BeginXXX() method to start a task and an EndXXX() method that completes a task. The XXX portion represents an actual name of the blocking task in the class. For example, in the FileStream class, the Write() method can potentially block when writing data to a file. The FileStream class has BeginWrite() and EndWrite() counterpart methods that use the asynchronous pattern to prevent blocking on an I/O operation.

You should not mix the use of asynchronous calls with synchronous calls from the same instance of a class in the .NET Framework. For example, do not attempt to perform a FileStream Read() synchronous operation while a FileStream BeginRead() asynchronous operation is running. Doing so can cause unpredictable behavior in the class at hand.

To use the asynchronous pattern, you must call the BeginXXX() method and supply a delegate callback method to the BeginXXX() call. When BeginXXX() is called, it will complete immediately and the class at hand will run the XXX operation in parallel, which means that your calling application (or calling thread) is free to continue processing other code. Each .NET Framework class internally handles asynchronous processing using threads and other operating system asynchronous mechanisms. The main point to remember here is that all the classes that use the asynchronous pattern do so with the same design from your application’s point of view. When an asynchronous operation completes, your supplied delegate method is invoked to process the completed task results. Inside your delegate method, you have to call the EndXXX() counterpart method to retrieve the completed task results.

When your application performs an asynchronous BeginXXX() call, you should not call the EndXXX() counterpart method using the IAsyncResult value returned from BeginXXX() while the call is running asynchronously. If you do so, your EndXXX() call will block until the BeginXXX() has completed. When your BeginXXX() delegate method finally calls EndXXX() after the asynchronous operation is completed, an InvalidOperationException will be thrown on the delegate’s EndXXX() call.

For example, let’s describe how to process a Read() call asynchronously from the NetworkStream class using the asynchronous pattern. We chose Read() because it will typically block when you wait for data to arrive on a network stream. To call Read() asynchronously, you must call BeginRead() and supply an asynchronous delegate method named AsyncCallback() that will be invoked when the BeginRead() operation completes. An AsyncCallback() method is similar to a thread delegate method, as described earlier in the chapter. However, AsyncCallback() requires you to supply an IAsyncResult interface that’s used to retrieve the results of a completed asynchronous operation. The following code fragment demonstrates how to design an AsyncCallback() method that handles the completion of a BeginRead() asynchronous operation from the NetworkStream class we described in Chapter 2.

 

C#

 

void ProcessReceiveResults(IAsyncResult AsyncResult)

{

    NetworkStream NS = (NetworkStream) AsyncResult.AsyncState;

 

    try

    {

        int BytesRead = 0;

 

        BytesRead = NS.EndRead(AsyncResult);

 

        if (BytesRead == 0)

        {

            NS.Close();

            return;

        }

    }

    catch (Exception e)

    {

        Console.WriteLine("Failed to read network stream with error: " + e.Message);

        NS.Close();

    }

}

 

Visual Basic .NET

 

Shared Sub ProcessReceiveResults(ByVal AsyncResult As IAsyncResult)

    Dim NS As NetworkStream = AsyncResult.AsyncState

 

    Try

        Dim BytesRead As Integer = 0

 

        BytesRead = NS.EndRead(AsyncResult)

 

        If (BytesRead = 0) Then

            NS.Close()

            Exit Sub

        End If

    Catch e As Exception

        Console.WriteLine("Failed to read network stream with error: " + e.Message)

 

        NS.Close()

    End Try

End Sub

 

The AsyncResult parameter is an input parameter that receives an IAsyncResult object that you have to pass to the EndRead() counterpart method. (Alternatively, you can use the IAsyncResult object that’s returned from the originating BeginRead() call.) Also, IAsyncResult contains an important member variable named AsyncState that contains the state parameter that was passed in the originating BeginRead() call. Typically, you’ll use the state parameter to pass information that’s related to the asynchronous call BeginXXX(). For example, in the preceding code fragment, we demonstrated how to pass an instance of the NetworkStream class to the delegate.

Once your delegate method is defined, you can start the asynchronous operation using the delegate method. The following code fragment demonstrates how to use the ProcessReceiveResults delegate to handle the asynchronous result of a BeginRead() operation on a network stream:

 

C#

 

AsyncCallback AsyncReceiveCallback = new AsyncCallback(ProcessReceiveResults);

 

MyNetworkStream.BeginRead(Buffer, 0, Buffer.Length, AsyncReceiveCallback, MyNetworkStream);

 

Visual Basic .NET

 

AsyncReceiveCallback As AsyncCallback = New AsyncCallback(AddressOf ProcessReceiveResults)

 

MyNetworkStream.BeginRead(Buffer, 0, Buffer.Length, AsyncReceiveCallback, Me)

 

When a BeginXXX() starter method is called, it will complete immediately and return an IAsyncResult object. Your calling program might call the EndXXX() counterpart method on the returned IAsyncResult. If the asynchronous operation has not completed, the EndXXX() method will block until the operation has completed. Typically, most applications will handle the completed results in the delegate method instead of in the calling program thread. As you can see in the previous code fragment, we did not retrieve the returned IAsyncResult from the BeginRead() call. Instead, we allowed our ProcessReceiveResults delegate method to handle the completed results.

The following program examples AsyncNetworkIO and ThreadNetworkIO, that are revisions of the network stream server sample presented in Chapter 2. These revised servers are designed to handle multiple network connections and demonstrate how to use the NetworkStream class to perform network I/O asynchronously. AsyncNetworkIO uses the asynchronous pattern, and ThreadNetworkIO uses threads. The original network stream server sample accepted only one connection and processed I/O synchronously on only one connection. Because these applications process I/O asynchronously, they are now capable of handling multiple network connections at the same time.

 

C++ Asynchronous I/O Program Example

 

Create a new CLR console application project and you might want to use AsyncNetworkIOCP for the project and solution names as shown below.

 

C++ Asynchronous I/O Program Example - a new CLR console mode appliation project creation

 

Add the following code and you may discard the comment parts.

 

// AsyncNetworkIOCP.cpp : main project file.

// <summary>

// This sample is a revision of the NetworkStream Server sample in

// Chapter 02. This server is designed to accept multiple connections

// and demonstrates how to use the NetworkStream class to perform IO between

// 2 sockets. This application performs IO asynchronously by using the BeginXXX

// and EndXXX asynchronous IO pattern. The original sample only accepted 1

// connection and processed IO synchronously. Because this application processes

// IO asynchronously, the application is capable of handling multiple connections

// at the same time.

//

// To run this sample, simply just run the program without parameters and

// it will listen for a client connection on TCP port 5150. If you want to

// use a different port than 5150 then you can optionally supply a command

// line parameter "/port <port number>" and the listening socket will use

// your port instead.

// </summary>

 

#include "stdafx.h"

 

using namespace System;

using namespace System::Net;

using namespace System::Net::Sockets;

 

public ref class NetworkStreamHandler

{

    // ref class must use static variables

    static array< Byte >^ Buffer = gcnew array< Byte >(4096);

    static AsyncCallback^ AsyncReceiveCallback = gcnew AsyncCallback(&ProcessReceiveResults);

    // Let's create a network stream to communicate over the connected Socket.

    static NetworkStream^ ServerNetworkStream = nullptr;

    static int ConnectionNumber = 0;

 

public:

    NetworkStreamHandler(Socket^ ServerSocket, int ConnectionID)

    {

        ConnectionNumber = ConnectionID;

        try

        {

            // Setup a network stream on the server Socket

            Console::WriteLine("Setting up a network stream on the server Socket...");

            ServerNetworkStream = gcnew NetworkStream(ServerSocket, true);

            Console::WriteLine("NetworkStream() is OK...");

            ServerNetworkStream->BeginRead(Buffer, 0, Buffer->Length, AsyncReceiveCallback, this);

            Console::WriteLine("BeginRead() is OK...");

        }

        catch (Exception^ e)

        {

             Console::WriteLine(e->Message);

        }

    }

 

    static void ProcessReceiveResults(IAsyncResult^ ar)

    {

        NetworkStreamHandler^ NSH = (NetworkStreamHandler^)ar->AsyncState;

 

        try

        {

            int BytesRead = 0;

 

            Console::WriteLine("Receiving and reading data...");

            BytesRead = NSH->ServerNetworkStream->EndRead(ar);

            Console::WriteLine("Connection #" + NSH->ConnectionNumber.ToString() + " received " + BytesRead.ToString() + " byte(s).");

 

            if (BytesRead == 0)

            {

                Console::WriteLine("Connection #" + NSH->ConnectionNumber.ToString() + " is closing.");

                NSH->ServerNetworkStream->Close();

            }

            NSH->ServerNetworkStream->BeginRead(NSH->Buffer, 0, NSH->Buffer->Length, NSH->AsyncReceiveCallback, NSH);

        }

        catch (Exception^ e)

        {

            Console::WriteLine("Failed to read from a network stream with error: " + e->Message);

            NSH->ServerNetworkStream->Close();

        }

    }

};

 

int main(array<System::String ^> ^args)

{

            // Default port number   

            int Port = 5150;

            Environment::GetCommandLineArgs();

 

            // Test for a command-line argument.

            // Parse command line arguments if any

            if(args->Length != 2)

            {

                        Console::WriteLine("Usage: {0} [/port <port number>]", Environment::CommandLine);

                        return 0;

            }

 

            for (int i = 0; i < args->Length; i++)

                        {

                                    try

                                    {

                                                if (String::Compare(args[i], "/port", true) == 0)

                                                {

                                                            // The port on which the server is listening

                                                            Port = Convert::ToInt32(args[++i]->ToString());

                                                }

                                    }

                                    catch (IndexOutOfRangeException^)

                                    {

                                                Console::WriteLine("Usage: {0} [/port <port number>]", Environment::CommandLine);

                                                return 0;

                                    }

                        }

            Socket^ ServerSocket = nullptr;

            Socket^ ListeningSocket = nullptr;

           

            try

            {

                        // Setup a listening Socket to await a connection from a peer socket.

                        ListeningSocket = gcnew Socket(AddressFamily::InterNetwork, SocketType::Stream, ProtocolType::IP);

                        Console::WriteLine("Socket() is OK...");

                        IPEndPoint^ ListeningEndPoint = gcnew IPEndPoint(IPAddress::Any, Port);

                        Console::WriteLine("IPEndPoint() is OK...");

                        ListeningSocket->Bind(ListeningEndPoint);

                        Console::WriteLine("Bind() is OK...");

                        ListeningSocket->Listen(5);

                        Console::WriteLine("Listen() is OK...");

 

                        Console::WriteLine("Listening and awaiting a TCP connection on IP: "

                                    + ListeningEndPoint->Address->ToString()

                                    + " Port: "

                                    + ListeningEndPoint->Port.ToString()

                                    + "...");

 

        int ConnectionCount = 0;

 

        while (true)

        {

            ServerSocket = ListeningSocket->Accept();

            ConnectionCount++;

            Console::WriteLine("Connection #" + ConnectionCount.ToString() + " is established and awaiting data...");

            NetworkStreamHandler^ NSH = gcnew NetworkStreamHandler(ServerSocket, ConnectionCount);

        }

    }

    catch (SocketException^ e)

    {

        Console::WriteLine("Failure to create Sockets: " + e->Message);

        return 0;

    }

    finally

    {

        // Close the listening socket - we do not plan to handle any additional connections.

        Console::WriteLine("Closing the listening socket...");

        ListeningSocket->Close();

    }

 

return 0;

}

 

Build and run this project. Unblock the Windows Security Alert that generated by the Windows firewall if any.

 

C++ Asynchronous I/O Program Example - unblocking the Windows personal firewall protection  

 

The following are the sample outputs. The program is waiting connection from sender.

 

C++ Asynchronous I/O Program Example - a sample output in action waiting for connection from client

 

Then, while the program is waiting for connection, let test the sender program. In this case we use networkstreamsamplecpsender, networkstreamsamplevbsender and networkstreamsamplecssender programs that created previously to send some messages with port number set to 5687 and using localhost. Sample outputs of the senders are shown below.

 

C++ Asynchronous I/O Program Example - sender program in action

 

C++ Asynchronous I/O Program Example - sender program output when communication was completed

 

The following screenshot shows the output for the receiver when all the communication was completed successfully.

 

C++ Asynchronous I/O Program Example - receiver program output when communication was completed

 

 

 


 

< C# & VB .NET Critical Section Examples | Main | C# Asynchronous I/O Program Example >