< Overlapped I/O with Callback Example - AcceptEx() | Winsock2 I/O Method Main | Completion Port & Overlapped I/O >


 

 

Winsock 2 I/O Methods 5 Part 10

 

 

What do we have in this chapter 5 part 10?

  1. The Completion Port Model

  2. Worker Threads and Completion Ports

  3. The Completion Port Model Program Example

 

The Completion Port Model

 

For newcomers, the completion port model seems overwhelmingly complicated because extra work is required to add sockets to a completion port when compared to the initialization steps for the other I/O models. However, as you will see, these steps are not that complicated once you understand them. Also, the completion port model offers the best system performance possible when an application has to manage many sockets at once. Unfortunately, it's available only on Windows NT, Windows 2000, and Windows XP; however, the completion port model offers the best scalability of all the models discussed so far. This model is well suited to handling hundreds or thousands of sockets.

Essentially, the completion port model requires you to create a Windows completion port object that will manage overlapped I/O requests using a specified number of threads to service the completed overlapped I/O requests. Note that a completion port is actually a Windows I/O construct that is capable of accepting more than just socket handles. However, this section will describe only how to take advantage of the completion port model by using socket handles. To begin using this model, you are required to create an I/O completion port object that will be used to manage multiple I/O requests for any number of socket handles. This is accomplished by calling the CreateIoCompletionPort() function, which is defined as:

 

HANDLE CreateIoCompletionPort(

    HANDLE FileHandle,

    HANDLE ExistingCompletionPort,

    DWORD CompletionKey,

    DWORD NumberOfConcurrentThreads

);

 

Before examining the parameters in detail, be aware that this function is actually used for two distinct purposes:

 

  1. To create a completion port object.
  2. To associate a handle with a completion port.

 

When you initially create a completion port object, the only parameter of interest is NumberOfConcurrentThreads; the first three parameters are not significant. The NumberOfConcurrentThreads parameter is special because it defines the number of threads that are allowed to execute concurrently on a completion port. Ideally, you want only one thread per processor to service the completion port to avoid thread context switching. The value 0 for this parameter tells the system to allow as many threads as there are processors in the system. The following code creates an I/O completion port.

 

 CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

 

This will return a handle that is used to identify the completion port when a socket handle is assigned to it.

 

Worker Threads and Completion Ports

 

After a completion port is successfully created, you can begin to associate socket handles with the object. Before associating sockets, though, you have to create one or more worker threads to service the completion port when socket I/O requests are posted to the completion port object. At this point, you might wonder how many threads should be created to service the completion port. This is actually one of the more complicated aspects of the completion port model because the number needed to service I/O requests depends on the overall design of your application. It's important to note the distinction between number of concurrent threads to specify when calling CreateIoCompletionPort() versus the number of worker threads to create; they do not represent the same thing. We recommended previously that you should have the CreateIoCompletionPort() function specify one thread per processor to avoid thread context switching. The NumberOfConcurrentThreads() parameter of CreateIoCompletionPort() explicitly tells the system to allow only n threads to operate at a time on the completion port. If you create more than n worker threads on the completion port, only n threads will be allowed to operate at a time. (Actually, the system might exceed this value for a short amount of time, but the system will quickly bring it down to the value you specify in CreateIoCompletionPort().) You might be wondering why you would create more worker threads than the number specified by the CreateIoCompletionPort() call. As we mentioned previously, this depends on the overall design of your application. If one of your worker threads calls a function, such as Sleep() or WaitForSingleObject() and becomes suspended, another thread will be allowed to operate in its place. In other words, you always want to have as many threads available for execution as the number of threads you allow to execute in the CreateIoCompletionPort() call. Thus, if you expect your worker thread to ever become blocked, it is reasonable to create more worker threads than the value specified in CreateIoCompletionPort()'s NumberOfConcurrentThreads() parameter.

Once you have enough worker threads to service I/O requests on the completion port, you can begin to associate socket handles with the completion port. This requires calling the CreateIoCompletionPort() function on an existing completion port and supplying the first three parameters, FileHandle, ExistingCompletionPort, and CompletionKey, with socket information. The FileHandle parameter represents a socket handle to associate with the completion port. The ExistingCompletionPort parameter identifies the completion port to which the socket handle is to be associated with. The CompletionKey parameter identifies per-handle data that you can associate with a particular socket handle. Applications are free to store any type of information associated with a socket by using this key. We call it per-handle data because it represents data associated with a socket handle. It is useful to store the socket handle using the key as a pointer to a data structure containing the socket handle and other socket-specific information. As we will see later in this chapter, the thread routines that service the completion port can retrieve socket-handle–specific information using this key.

Let's begin to construct a basic application framework from what we've described so far. The following example demonstrates how to start developing an echo server application using the completion port model. In this code, we take the following preparation steps:

 

  1. Create a completion port. The fourth parameter is left as 0, specifying that only one worker thread per processor will be allowed to execute at a time on the completion port.
  2. Determine how many processors exist on the system.
  3. Create worker threads to service completed I/O requests on the completion port using processor information in step 2. In the case of this simple example, we create one worker thread per processor because we do not expect our threads to ever get in a suspended condition in which there would not be enough threads to execute for each processor. When the CreateThread() function is called, you must supply a worker routine that the thread executes upon creation. We will discuss the worker thread's responsibilities later in this section.
  4. Prepare a listening socket to listen for connections on port 5150.
  5. Accept inbound connections using the accept function.
  6. Create a data structure to represent per-handle data and save the accepted socket handle in the structure.
  7. Associate the new socket handle returned from accept with the completion port by calling CreateIoCompletionPort(). Pass the per-handle data structure to CreateIoCompletionPort() via the completion key parameter.
  8. Start processing I/O on the accepted connection. Essentially, you want to post one or more asynchronous WSARecv() or WSASend() requests on the new socket using the overlapped I/O mechanism. When these I/O requests complete, a worker thread services the I/O requests and continues processing future I/O requests, as we will see later in the worker routine specified in step 3.
  9. Repeat steps 5-8 until server terminates.

 

 

 

 

HANDLE CompletionPort;

WSADATA wsd;

SYSTEM_INFO SystemInfo;

SOCKADDR_IN InternetAddr;

SOCKET Listen;

int i;

 

typedef struct _PER_HANDLE_DATA

{

            SOCKET        Socket;

            SOCKADDR_STORAGE  ClientAddr;

            // Other information useful to be associated with the handle

} PER_HANDLE_DATA, * LPPER_HANDLE_DATA;

 

// Load Winsock

StartWinsock(MAKEWORD(2,2), &wsd);

 

// Step 1:

// Create an I/O completion port

CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

 

// Step 2:

// Determine how many processors are on the system

GetSystemInfo(&SystemInfo);

 

// Step 3:

// Create worker threads based on the number of

// processors available on the system. For this

// simple case, we create one worker thread for each processor.

for(i = 0; i < SystemInfo.dwNumberOfProcessors; i++)

{

    HANDLE ThreadHandle;

 

    // Create a server worker thread, and pass the

    // completion port to the thread. NOTE: the

    // ServerWorkerThread procedure is not defined in this listing.

    ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread, CompletionPort, 0, NULL;

    // Close the thread handle

    CloseHandle(ThreadHandle);

}

 

// Step 4:

// Create a listening socket

Listen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

 

InternetAddr.sin_family = AF_INET;

InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);

InternetAddr.sin_port = htons(5150);

bind(Listen, (PSOCKADDR) &InternetAddr, sizeof(InternetAddr));

 

// Prepare socket for listening

listen(Listen, 5);

 

while(TRUE)

{

    PER_HANDLE_DATA *PerHandleData=NULL;

    SOCKADDR_IN saRemote;

    SOCKET Accept;

    int RemoteLen;

 

    // Step 5:

    // Accept connections and assign to the completion port

    RemoteLen = sizeof(saRemote);

    Accept = WSAAccept(Listen, (SOCKADDR *)&saRemote, &RemoteLen);

 

    // Step 6:

    // Create per-handle data information structure to associate with the socket

    PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA));

 

    printf("Socket number %d connected\n", Accept);

    PerHandleData->Socket = Accept;

    memcpy(&PerHandleData->ClientAddr, &saRemote, RemoteLen);

 

    // Step 7:

    // Associate the accepted socket with the completion port

    CreateIoCompletionPort((HANDLE) Accept, CompletionPort, (DWORD) PerHandleData, 0);

 

    // Step 8:

    //  Start processing I/O on the accepted socket.

    //  Post one or more WSASend() or WSARecv() calls on the socket using overlapped I/O.

    WSARecv(...);

}

 

DWORD WINAPI ServerWorkerThread(LPVOID lpParam)

{

            // The requirements for the worker thread will be discussed later.

}

return 0;

}

 

The Completion Port Model Program Example

 

The following program example tries to demonstrate the server/receiver that implements completion port model. Create a new empty Win32 console mode application and add the project/solution name.

 

The Completion Port Model: program example tries to demonstrate the server/receiver that implements completion port model. Creating a new empty Win32 console mode application and add the project/solution name.

 

Add the following source code.

 

// Description:

//

//    This sample illustrates how to develop a simple echo server Winsock

//    application using the completeion port I/O model. This

//    sample is implemented as a console-style application and simply prints

//    messages when connections are established and removed from the server.

//    The application listens for TCP connections on port 5150 and accepts them

//    as they arrive. When this application receives data from a client, it

//    simply echos (this is why we call it an echo server) the data back in

//    it's original form until the client closes the connection.

//

//    Note: There are no command line options for this sample.

//

// Link to ws2_32.lib

#include <winsock2.h>

#include <windows.h>

#include <stdio.h>

 

#define PORT 5150

#define DATA_BUFSIZE 8192

 

// Typedef definition

typedef struct

{

   OVERLAPPED Overlapped;

   WSABUF DataBuf;

   CHAR Buffer[DATA_BUFSIZE];

   DWORD BytesSEND;

   DWORD BytesRECV;

} PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;

 

// Structure definition

typedef struct

{

   SOCKET Socket;

} PER_HANDLE_DATA, * LPPER_HANDLE_DATA;

 

// Prototype

DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID);

 

int main(int argc, char **argv)

{

            SOCKADDR_IN InternetAddr;

            SOCKET Listen;

            HANDLE ThreadHandle;

            SOCKET Accept;

            HANDLE CompletionPort;

            SYSTEM_INFO SystemInfo;

            LPPER_HANDLE_DATA PerHandleData;

            LPPER_IO_OPERATION_DATA PerIoData;

            int i;

            DWORD RecvBytes;

            DWORD Flags;

            DWORD ThreadID;

            WSADATA wsaData;

            DWORD Ret;

 

            if ((Ret = WSAStartup((2,2), &wsaData)) != 0)

            {

                        printf("WSAStartup() failed with error %d\n", Ret);

                        return 1;

            }

            else

                        printf("WSAStartup() is OK!\n");

 

            // Setup an I/O completion port

            if ((CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)) == NULL)

            {

                        printf("CreateIoCompletionPort() failed with error %d\n", GetLastError());

                        return 1;

            }

            else

                        printf("CreateIoCompletionPort() is damn OK!\n");

 

 

            // Determine how many processors are on the system

            GetSystemInfo(&SystemInfo);

            // Create worker threads based on the number of processors available on the

            // system. Create two worker threads for each processor

            for(i = 0; i < (int)SystemInfo.dwNumberOfProcessors * 2; i++)

            {

                        // Create a server worker thread and pass the completion port to the thread

                        if ((ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread, CompletionPort,  0, &ThreadID)) == NULL)

                        {

                                    printf("CreateThread() failed with error %d\n", GetLastError());

                                    return 1;

                        }

                        else

                                    printf("CreateThread() is OK!\n");

                        // Close the thread handle

                        CloseHandle(ThreadHandle);

            }

 

            // Create a listening socket

            if ((Listen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET)

            {

                        printf("WSASocket() failed with error %d\n", WSAGetLastError());

                        return 1;

            }

            else

                        printf("WSASocket() is OK!\n");

 

            InternetAddr.sin_family = AF_INET;

            InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);

            InternetAddr.sin_port = htons(PORT);

 

            if (bind(Listen, (PSOCKADDR) &InternetAddr, sizeof(InternetAddr)) == SOCKET_ERROR)

            {

                        printf("bind() failed with error %d\n", WSAGetLastError());

                        return 1;

            }

            else

                        printf("bind() is fine!\n");

 

            // Prepare socket for listening

            if (listen(Listen, 5) == SOCKET_ERROR)

            {

                        printf("listen() failed with error %d\n", WSAGetLastError());

                        return 1;

            }

            else

                        printf("listen() is working...\n");

 

            // Accept connections and assign to the completion port

            while(TRUE)

            {

                        if ((Accept = WSAAccept(Listen, NULL, NULL, NULL, 0)) == SOCKET_ERROR)

                        {

                                    printf("WSAAccept() failed with error %d\n", WSAGetLastError());

                                    return 1;

                        }

                        else

                                    printf("WSAAccept() looks fine!\n");

 

                        // Create a socket information structure to associate with the socket

                        if ((PerHandleData = (LPPER_HANDLE_DATA) GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA))) == NULL)

                                    printf("GlobalAlloc() failed with error %d\n", GetLastError());

                        else

                                    printf("GlobalAlloc() for LPPER_HANDLE_DATA is OK!\n");

                        return 1;

            }

 

            // Associate the accepted socket with the original completion port

            printf("Socket number %d got connected...\n", Accept);

            PerHandleData->Socket = Accept;

 

            if (CreateIoCompletionPort((HANDLE) Accept, CompletionPort, (DWORD) PerHandleData, 0) == NULL)

            {

                        printf("CreateIoCompletionPort() failed with error %d\n", GetLastError());

                        return 1;

            }

            else

                        printf("CreateIoCompletionPort() is OK!\n");

 

            // Create per I/O socket information structure to associate with the WSARecv call below

            if ((PerIoData = (LPPER_IO_OPERATION_DATA) GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA))) == NULL)

            {

                        printf("GlobalAlloc() failed with error %d\n", GetLastError());

                        return 1;

            }

            else

                        printf("GlobalAlloc() for LPPER_IO_OPERATION_DATA is OK!\n");

 

            ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));

            PerIoData->BytesSEND = 0;

            PerIoData->BytesRECV = 0;

            PerIoData->DataBuf.len = DATA_BUFSIZE;

            PerIoData->DataBuf.buf = PerIoData->Buffer;

 

            Flags = 0;

            if (WSARecv(Accept, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags, &(PerIoData->Overlapped), NULL) == SOCKET_ERROR)

            {

                        if (WSAGetLastError() != ERROR_IO_PENDING)

                        {

                                    printf("WSARecv() failed with error %d\n", WSAGetLastError());

                                    return 1;

                        }

            }

            else

                        printf("WSARecv() is OK!\n");

}

 

 

 

 

DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID)

{

            HANDLE CompletionPort = (HANDLE) CompletionPortID;

            DWORD BytesTransferred;

            LPPER_HANDLE_DATA PerHandleData;

            LPPER_IO_OPERATION_DATA PerIoData;

            DWORD SendBytes, RecvBytes;

            DWORD Flags;

 

            while(TRUE)

            {

                        if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred,

                                    (LPDWORD)&PerHandleData, (LPOVERLAPPED *) &PerIoData, INFINITE) == 0)

                        {

                                    printf("GetQueuedCompletionStatus() failed with error %d\n", GetLastError());

                                    return 0;

                        }

                        else

                                    printf("GetQueuedCompletionStatus() is OK!\n");

 

                        // First check to see if an error has occurred on the socket and if so

                        // then close the socket and cleanup the SOCKET_INFORMATION structure

                        // associated with the socket

                        if (BytesTransferred == 0)

                        {

                                    printf("Closing socket %d\n", PerHandleData->Socket);

                                    if (closesocket(PerHandleData->Socket) == SOCKET_ERROR)

                                    {

                                                printf("closesocket() failed with error %d\n", WSAGetLastError());

                                                return 0;

                                    }

                                    else

                                                printf("closesocket() is fine!\n");

 

                                    GlobalFree(PerHandleData);

                                    GlobalFree(PerIoData);

                                    continue;

                        }

 

                        // Check to see if the BytesRECV field equals zero. If this is so, then

                        // this means a WSARecv call just completed so update the BytesRECV field

                        // with the BytesTransferred value from the completed WSARecv() call

                        if (PerIoData->BytesRECV == 0)

                        {

                                    PerIoData->BytesRECV = BytesTransferred;

                                    PerIoData->BytesSEND = 0;

                        }

                        else

                        {

                                    PerIoData->BytesSEND += BytesTransferred;

                        }

 

                        if (PerIoData->BytesRECV > PerIoData->BytesSEND)

                        {

                                    // Post another WSASend() request.

                                    // Since WSASend() is not guaranteed to send all of the bytes requested,

                                    // continue posting WSASend() calls until all received bytes are sent.

                                    ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));

                                    PerIoData->DataBuf.buf = PerIoData->Buffer + PerIoData->BytesSEND;

                                    PerIoData->DataBuf.len = PerIoData->BytesRECV - PerIoData->BytesSEND;

 

                                    if (WSASend(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &SendBytes, 0,

                                                &(PerIoData->Overlapped), NULL) == SOCKET_ERROR)

                                    {

                                                if (WSAGetLastError() != ERROR_IO_PENDING)

                                                {

                                                            printf("WSASend() failed with error %d\n", WSAGetLastError());

                                                            return 0;

                                                }

                                    }

                                    else

                                                printf("WSASend() is OK!\n");

                        }

                        else

                        {

                                    PerIoData->BytesRECV = 0;

                                    // Now that there are no more bytes to send post another WSARecv() request

                                    Flags = 0;

                                    ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));

                                    PerIoData->DataBuf.len = DATA_BUFSIZE;

                                    PerIoData->DataBuf.buf = PerIoData->Buffer;

 

                                   if (WSARecv(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags,

                                                &(PerIoData->Overlapped), NULL) == SOCKET_ERROR)

                                    {

                                                if (WSAGetLastError() != ERROR_IO_PENDING)

                                                {

                                                            printf("WSARecv() failed with error %d\n", WSAGetLastError());

                                                            return 0;

                                                }

                                    }

                                    else

                                                printf("WSARecv() is OK!\n");

                        }

            }

}

 

Build and run the project.

 

The Completion Port Model: program example tries to demonstrate the server/receiver that implements completion port model. Running the program showing a sample output

 

 

 


< Overlapped I/O with Callback Example - AcceptEx() | Winsock2 I/O Method Main | Completion Port & Overlapped I/O >