< Overlapped I/O with Callback Example - AcceptEx() | Winsock2 I/O Method Main | Completion Port & Overlapped I/O >
What do we have in this chapter 5 part 10?
|
The Completion Port Model
For newcomers, the completion port model seems overwhelmingly complicated because extra work is required to add sockets to a completion port when compared to the initialization steps for the other I/O models. However, as you will see, these steps are not that complicated once you understand them. Also, the completion port model offers the best system performance possible when an application has to manage many sockets at once. Unfortunately, it's available only on Windows NT, Windows 2000, and Windows XP; however, the completion port model offers the best scalability of all the models discussed so far. This model is well suited to handling hundreds or thousands of sockets. Essentially, the completion port model requires you to create a Windows completion port object that will manage overlapped I/O requests using a specified number of threads to service the completed overlapped I/O requests. Note that a completion port is actually a Windows I/O construct that is capable of accepting more than just socket handles. However, this section will describe only how to take advantage of the completion port model by using socket handles. To begin using this model, you are required to create an I/O completion port object that will be used to manage multiple I/O requests for any number of socket handles. This is accomplished by calling the CreateIoCompletionPort() function, which is defined as:
HANDLE CreateIoCompletionPort( HANDLE FileHandle, HANDLE ExistingCompletionPort, DWORD CompletionKey, DWORD NumberOfConcurrentThreads );
Before examining the parameters in detail, be aware that this function is actually used for two distinct purposes:
When you initially create a completion port object, the only parameter of interest is NumberOfConcurrentThreads; the first three parameters are not significant. The NumberOfConcurrentThreads parameter is special because it defines the number of threads that are allowed to execute concurrently on a completion port. Ideally, you want only one thread per processor to service the completion port to avoid thread context switching. The value 0 for this parameter tells the system to allow as many threads as there are processors in the system. The following code creates an I/O completion port.
CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); |
This will return a handle that is used to identify the completion port when a socket handle is assigned to it.
After a completion port is successfully created, you can begin to associate socket handles with the object. Before associating sockets, though, you have to create one or more worker threads to service the completion port when socket I/O requests are posted to the completion port object. At this point, you might wonder how many threads should be created to service the completion port. This is actually one of the more complicated aspects of the completion port model because the number needed to service I/O requests depends on the overall design of your application. It's important to note the distinction between number of concurrent threads to specify when calling CreateIoCompletionPort() versus the number of worker threads to create; they do not represent the same thing. We recommended previously that you should have the CreateIoCompletionPort() function specify one thread per processor to avoid thread context switching. The NumberOfConcurrentThreads() parameter of CreateIoCompletionPort() explicitly tells the system to allow only n threads to operate at a time on the completion port. If you create more than n worker threads on the completion port, only n threads will be allowed to operate at a time. (Actually, the system might exceed this value for a short amount of time, but the system will quickly bring it down to the value you specify in CreateIoCompletionPort().) You might be wondering why you would create more worker threads than the number specified by the CreateIoCompletionPort() call. As we mentioned previously, this depends on the overall design of your application. If one of your worker threads calls a function, such as Sleep() or WaitForSingleObject() and becomes suspended, another thread will be allowed to operate in its place. In other words, you always want to have as many threads available for execution as the number of threads you allow to execute in the CreateIoCompletionPort() call. Thus, if you expect your worker thread to ever become blocked, it is reasonable to create more worker threads than the value specified in CreateIoCompletionPort()'s NumberOfConcurrentThreads() parameter.
Once you have enough worker threads to service I/O requests on the completion port, you can begin to associate socket handles with the completion port. This requires calling the CreateIoCompletionPort() function on an existing completion port and supplying the first three parameters, FileHandle, ExistingCompletionPort, and CompletionKey, with socket information. The FileHandle parameter represents a socket handle to associate with the completion port. The ExistingCompletionPort parameter identifies the completion port to which the socket handle is to be associated with. The CompletionKey parameter identifies per-handle data that you can associate with a particular socket handle. Applications are free to store any type of information associated with a socket by using this key. We call it per-handle data because it represents data associated with a socket handle. It is useful to store the socket handle using the key as a pointer to a data structure containing the socket handle and other socket-specific information. As we will see later in this chapter, the thread routines that service the completion port can retrieve socket-handle–specific information using this key.
Let's begin to construct a basic application framework from what we've described so far. The following example demonstrates how to start developing an echo server application using the completion port model. In this code, we take the following preparation steps:
HANDLE CompletionPort;
WSADATA wsd;
SYSTEM_INFO SystemInfo;
SOCKADDR_IN InternetAddr;
SOCKET Listen;
int i;
typedef struct _PER_HANDLE_DATA
{
SOCKET Socket;
SOCKADDR_STORAGE ClientAddr;
// Other information useful to be associated with the handle
} PER_HANDLE_DATA, * LPPER_HANDLE_DATA;
// Load Winsock
StartWinsock(MAKEWORD(2,2), &wsd);
// Step 1:
// Create an I/O completion port
CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
// Step 2:
// Determine how many processors are on the system
GetSystemInfo(&SystemInfo);
// Step 3:
// Create worker threads based on the number of
// processors available on the system. For this
// simple case, we create one worker thread for each processor.
for(i = 0; i < SystemInfo.dwNumberOfProcessors; i++)
{
HANDLE ThreadHandle;
// Create a server worker thread, and pass the
// completion port to the thread. NOTE: the
// ServerWorkerThread procedure is not defined in this listing.
ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread, CompletionPort, 0, NULL;
// Close the thread handle
CloseHandle(ThreadHandle);
}
// Step 4:
// Create a listening socket
Listen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
InternetAddr.sin_family = AF_INET;
InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
InternetAddr.sin_port = htons(5150);
bind(Listen, (PSOCKADDR) &InternetAddr, sizeof(InternetAddr));
// Prepare socket for listening
listen(Listen, 5);
while(TRUE)
{
PER_HANDLE_DATA *PerHandleData=NULL;
SOCKADDR_IN saRemote;
SOCKET Accept;
int RemoteLen;
// Step 5:
// Accept connections and assign to the completion port
RemoteLen = sizeof(saRemote);
Accept = WSAAccept(Listen, (SOCKADDR *)&saRemote, &RemoteLen);
// Step 6:
// Create per-handle data information structure to associate with the socket
PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA));
printf("Socket number %d connected\n", Accept);
PerHandleData->Socket = Accept;
memcpy(&PerHandleData->ClientAddr, &saRemote, RemoteLen);
// Step 7:
// Associate the accepted socket with the completion port
CreateIoCompletionPort((HANDLE) Accept, CompletionPort, (DWORD) PerHandleData, 0);
// Step 8:
// Start processing I/O on the accepted socket.
// Post one or more WSASend() or WSARecv() calls on the socket using overlapped I/O.
WSARecv(...);
}
DWORD WINAPI ServerWorkerThread(LPVOID lpParam)
{
// The requirements for the worker thread will be discussed later.
}
return 0;
}
The following program example tries to demonstrate the server/receiver that implements completion port model. Create a new empty Win32 console mode application and add the project/solution name.
Add the following source code.
// Description:
//
// This sample illustrates how to develop a simple echo server Winsock
// application using the completeion port I/O model. This
// sample is implemented as a console-style application and simply prints
// messages when connections are established and removed from the server.
// The application listens for TCP connections on port 5150 and accepts them
// as they arrive. When this application receives data from a client, it
// simply echos (this is why we call it an echo server) the data back in
// it's original form until the client closes the connection.
//
// Note: There are no command line options for this sample.
//
// Link to ws2_32.lib
#include <winsock2.h>
#include <windows.h>
#include <stdio.h>
#define PORT 5150
#define DATA_BUFSIZE 8192
// Typedef definition
typedef struct
{
OVERLAPPED Overlapped;
WSABUF DataBuf;
CHAR Buffer[DATA_BUFSIZE];
DWORD BytesSEND;
DWORD BytesRECV;
} PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;
// Structure definition
typedef struct
{
SOCKET Socket;
} PER_HANDLE_DATA, * LPPER_HANDLE_DATA;
// Prototype
DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID);
int main(int argc, char **argv)
{
SOCKADDR_IN InternetAddr;
SOCKET Listen;
HANDLE ThreadHandle;
SOCKET Accept;
HANDLE CompletionPort;
SYSTEM_INFO SystemInfo;
LPPER_HANDLE_DATA PerHandleData;
LPPER_IO_OPERATION_DATA PerIoData;
int i;
DWORD RecvBytes;
DWORD Flags;
DWORD ThreadID;
WSADATA wsaData;
DWORD Ret;
if ((Ret = WSAStartup((2,2), &wsaData)) != 0)
{
printf("WSAStartup() failed with error %d\n", Ret);
return 1;
}
else
printf("WSAStartup() is OK!\n");
// Setup an I/O completion port
if ((CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)) == NULL)
{
printf("CreateIoCompletionPort() failed with error %d\n", GetLastError());
return 1;
}
else
printf("CreateIoCompletionPort() is damn OK!\n");
// Determine how many processors are on the system
GetSystemInfo(&SystemInfo);
// Create worker threads based on the number of processors available on the
// system. Create two worker threads for each processor
for(i = 0; i < (int)SystemInfo.dwNumberOfProcessors * 2; i++)
{
// Create a server worker thread and pass the completion port to the thread
if ((ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread, CompletionPort, 0, &ThreadID)) == NULL)
{
printf("CreateThread() failed with error %d\n", GetLastError());
return 1;
}
else
printf("CreateThread() is OK!\n");
// Close the thread handle
CloseHandle(ThreadHandle);
}
// Create a listening socket
if ((Listen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET)
{
printf("WSASocket() failed with error %d\n", WSAGetLastError());
return 1;
}
else
printf("WSASocket() is OK!\n");
InternetAddr.sin_family = AF_INET;
InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
InternetAddr.sin_port = htons(PORT);
if (bind(Listen, (PSOCKADDR) &InternetAddr, sizeof(InternetAddr)) == SOCKET_ERROR)
{
printf("bind() failed with error %d\n", WSAGetLastError());
return 1;
}
else
printf("bind() is fine!\n");
// Prepare socket for listening
if (listen(Listen, 5) == SOCKET_ERROR)
{
printf("listen() failed with error %d\n", WSAGetLastError());
return 1;
}
else
printf("listen() is working...\n");
// Accept connections and assign to the completion port
while(TRUE)
{
if ((Accept = WSAAccept(Listen, NULL, NULL, NULL, 0)) == SOCKET_ERROR)
{
printf("WSAAccept() failed with error %d\n", WSAGetLastError());
return 1;
}
else
printf("WSAAccept() looks fine!\n");
// Create a socket information structure to associate with the socket
if ((PerHandleData = (LPPER_HANDLE_DATA) GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA))) == NULL)
printf("GlobalAlloc() failed with error %d\n", GetLastError());
else
printf("GlobalAlloc() for LPPER_HANDLE_DATA is OK!\n");
return 1;
}
// Associate the accepted socket with the original completion port
printf("Socket number %d got connected...\n", Accept);
PerHandleData->Socket = Accept;
if (CreateIoCompletionPort((HANDLE) Accept, CompletionPort, (DWORD) PerHandleData, 0) == NULL)
{
printf("CreateIoCompletionPort() failed with error %d\n", GetLastError());
return 1;
}
else
printf("CreateIoCompletionPort() is OK!\n");
// Create per I/O socket information structure to associate with the WSARecv call below
if ((PerIoData = (LPPER_IO_OPERATION_DATA) GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA))) == NULL)
{
printf("GlobalAlloc() failed with error %d\n", GetLastError());
return 1;
}
else
printf("GlobalAlloc() for LPPER_IO_OPERATION_DATA is OK!\n");
ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
PerIoData->BytesSEND = 0;
PerIoData->BytesRECV = 0;
PerIoData->DataBuf.len = DATA_BUFSIZE;
PerIoData->DataBuf.buf = PerIoData->Buffer;
Flags = 0;
if (WSARecv(Accept, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags, &(PerIoData->Overlapped), NULL) == SOCKET_ERROR)
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("WSARecv() failed with error %d\n", WSAGetLastError());
return 1;
}
}
else
printf("WSARecv() is OK!\n");
}
DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID)
{
HANDLE CompletionPort = (HANDLE) CompletionPortID;
DWORD BytesTransferred;
LPPER_HANDLE_DATA PerHandleData;
LPPER_IO_OPERATION_DATA PerIoData;
DWORD SendBytes, RecvBytes;
DWORD Flags;
while(TRUE)
{
if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred,
(LPDWORD)&PerHandleData, (LPOVERLAPPED *) &PerIoData, INFINITE) == 0)
{
printf("GetQueuedCompletionStatus() failed with error %d\n", GetLastError());
return 0;
}
else
printf("GetQueuedCompletionStatus() is OK!\n");
// First check to see if an error has occurred on the socket and if so
// then close the socket and cleanup the SOCKET_INFORMATION structure
// associated with the socket
if (BytesTransferred == 0)
{
printf("Closing socket %d\n", PerHandleData->Socket);
if (closesocket(PerHandleData->Socket) == SOCKET_ERROR)
{
printf("closesocket() failed with error %d\n", WSAGetLastError());
return 0;
}
else
printf("closesocket() is fine!\n");
GlobalFree(PerHandleData);
GlobalFree(PerIoData);
continue;
}
// Check to see if the BytesRECV field equals zero. If this is so, then
// this means a WSARecv call just completed so update the BytesRECV field
// with the BytesTransferred value from the completed WSARecv() call
if (PerIoData->BytesRECV == 0)
{
PerIoData->BytesRECV = BytesTransferred;
PerIoData->BytesSEND = 0;
}
else
{
PerIoData->BytesSEND += BytesTransferred;
}
if (PerIoData->BytesRECV > PerIoData->BytesSEND)
{
// Post another WSASend() request.
// Since WSASend() is not guaranteed to send all of the bytes requested,
// continue posting WSASend() calls until all received bytes are sent.
ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
PerIoData->DataBuf.buf = PerIoData->Buffer + PerIoData->BytesSEND;
PerIoData->DataBuf.len = PerIoData->BytesRECV - PerIoData->BytesSEND;
if (WSASend(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &SendBytes, 0,
&(PerIoData->Overlapped), NULL) == SOCKET_ERROR)
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("WSASend() failed with error %d\n", WSAGetLastError());
return 0;
}
}
else
printf("WSASend() is OK!\n");
}
else
{
PerIoData->BytesRECV = 0;
// Now that there are no more bytes to send post another WSARecv() request
Flags = 0;
ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
PerIoData->DataBuf.len = DATA_BUFSIZE;
PerIoData->DataBuf.buf = PerIoData->Buffer;
if (WSARecv(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags,
&(PerIoData->Overlapped), NULL) == SOCKET_ERROR)
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("WSARecv() failed with error %d\n", WSAGetLastError());
return 0;
}
}
else
printf("WSARecv() is OK!\n");
}
}
}
Build and run the project.
< Overlapped I/O with Callback Example - AcceptEx() | Winsock2 I/O Method Main | Completion Port & Overlapped I/O >