< Performance, Scalability & Winsock2 APIs | Scalability Main | Continue I/O Port Completion...(Adding header file) >
What do we have in this chapter 6 part 3?
|
|
The I/O Completion Port IPv4/IPv6 Server Program Example
Create a new empty Win32 console mode application and add the project/solution name.
Add the following source code. |
// Sample: I/O Completion Port IPv4/IPv6 Server
//
// Files:
// IOComplePortServersrc.cpp - this file
// resolve.cpp - Common name resolution routines
// resolve.h - Header file for name resolution routines
//
// Description:
// This sample illustrates how to write a scalable, high-performance
// Winsock server. This is implemented as a TCP (IPv4/IPv6) server
// designed to handle many connections simultaneously. The purpose
// of this server is an echo server. For each accepted connection,
// data is read and then sent back to the client. Several limitations
// are present to ensure that as many concurrent connections can
// be handled. First, each connection has only a single overlapped
// receive posted at any given time. Second, each connection may only
// have a maximum of five outstanding overlapped sends. This prevents
// a malicious client from connecting and only sending data -- this
// would cause an unlimited number of data to be buffered since the
// sends would block as the client is not receiving data (and the TCP
// window size goes to zero).
//
// This sample illustrates overlapped IO with a completion port for
// TCP over both IPv4 and IPv6. This sample uses the
// getaddrinfo/getnameinfo APIs which allows this application to be
// IP version independent. That is the desired address family
// (AF_INET or AF_INET6) can be determined simply from the string
// address passed via the -l command.
//
// For TCP, a listening socket is created for each IP address family
// available. Each socket is associated with a completion port and
// worker threads are spawned (one for each CPU available). For each
// listening thread, a number of AcceptEx are posted. The worker threads
// then wait for one of these to complete. Upon completion, the client
// socket is associated with the completion port and several receives
// are posted. The AcceptEx is reposted as well. Once data is received
// on a client socket, it is echoed back.
//
// The important thing to remember with IOCP is that the completion events
// may occur out of order; however, the buffers are guaranteed to be filled
// in the order posted. For our echo server this can cause problems as
// receive N+1 may complete before receive N. We can't echo back N+1 before
// echoing N. There are two approaches possible. First, we could surmise
// that since receive N+1 has completed then we can safely echo back receive
// N and N+1 at that time (to maintain the data ordering). To do this properly
// you'll have to call WSAGetOverlappedResult on receive N in order to find
// out how many bytes were received to echo it back. The second approach
// (which is implemented in this sample) is to keep a list of receive
// buffers that completed out of order. This list is maintained in the
// per-socket data structure. When receive N+1 completes, it will notice that
// receive N has not completed. The buffer is then queued in the out of
// order send list. Once receive N completes, its buffer is queued -- the
// queue is ordered in the same order that the receive operations are.
// Another routine (DoSends) goes through this list and sends those buffers
// that are available and in order. If any gaps are detected no further buffers
// are sent (as we will wait for that receive to complete and insert its
// buffer into the list so that the next call to DoSends will correctly
// send the buffers in the right order).
//
// For example:
// If this sample is called with the following command lines:
// IOComplePortServer -l fe80::2efe:1234 -e 5150
// IOComplePortServer -l ::
// Then the server creates an IPv6 socket as an IPv6 address was provided.
//
// On the other hand, with the following command line:
// IOComplePortServer -l 7.7.7.1 -e 5150
// IOComplePortServer -l 0.0.0.0
// Then the server creates an IPv4 socket.
//
// Calling the server with no parameters will create a server that
// listens both IPv4 and IPv6 (if installed).
// Usage:
// IOComplePortServer [options]
// -a 4|6 Address family, 4 = IPv4, 6 = IPv6 [default = IPv4]
// -b size Buffer size for send/recv
// -e port Port number
// -l addr Local address to bind to [default INADDR_ANY for IPv4 or INADDR6_ANY for IPv6]
// -os count Maximum number of overlapped send operations to allow simultaneously (per socket)
// -oa count Maximum number of overlapped accepts to allow simultaneously
// -o count Number of initial overlapped accepts to post
//
// Link to ws2_32.lib
#include <winsock2.h>
#include <ws2tcpip.h>
// Link to Mswsock.lib, Microsoft specific
#include <mswsock.h>
#include <windows.h>
#include <stdio.h>
#include <stdlib.h>
#include "resolve.h"
#define DEFAULT_BUFFER_SIZE 4096 // default buffer size
#define DEFAULT_OVERLAPPED_COUNT 5 // Number of overlapped recv per socket
#define MAX_OVERLAPPED_ACCEPTS 500
#define MAX_OVERLAPPED_SENDS 200
#define MAX_OVERLAPPED_RECVS 200
#define MAX_COMPLETION_THREAD_COUNT 32// Maximum number of completion threads allowed
#define BURST_ACCEPT_COUNT 100
int gAddressFamily = AF_UNSPEC, // default to unspecified
gSocketType = SOCK_STREAM, // default to TCP socket type
gProtocol = IPPROTO_TCP, // default to TCP protocol
gBufferSize = DEFAULT_BUFFER_SIZE,
gInitialAccepts = DEFAULT_OVERLAPPED_COUNT,
gMaxAccepts = MAX_OVERLAPPED_ACCEPTS,
gMaxReceives = MAX_OVERLAPPED_RECVS,
gMaxSends = MAX_OVERLAPPED_SENDS;
char *gBindAddr = NULL, // local interface to bind to
*gBindPort = "5150"; // local port to bind to
// Statistics counters
volatile LONG gBytesRead=0, gBytesSent=0, gStartTime=0, gBytesReadLast=0, gBytesSentLast=0,
gStartTimeLast=0, gConnections=0, gConnectionsLast=0, gOutstandingSends=0;
// This is our per I/O buffer. It contains a WSAOVERLAPPED structure as well
// as other necessary information for handling an IO operation on a socket.
typedef struct _BUFFER_OBJ
{
WSAOVERLAPPED ol;
SOCKET sclient; // Used for AcceptEx client socket
HANDLE PostAccept;
char *buf; // Buffer for recv/send/AcceptEx
int buflen; // Length of the buffer
int operation; // Type of operation issued
#define OP_ACCEPT 0 // AcceptEx
#define OP_READ 1 // WSARecv/WSARecvFrom
#define OP_WRITE 2 // WSASend/WSASendTo
SOCKADDR_STORAGE addr;
int addrlen;
struct _SOCKET_OBJ *sock;
struct _BUFFER_OBJ *next;
} BUFFER_OBJ;
// This is our per socket buffer. It contains information about the socket handle
// which is returned from each GetQueuedCompletionStatus call.
typedef struct _SOCKET_OBJ
{
SOCKET s; // Socket handle
int af, // Address family of socket (AF_INET, AF_INET6)
bClosing; // Is the socket closing?
volatile LONG OutstandingRecv, // Number of outstanding overlapped ops on
OutstandingSend, PendingSend;
CRITICAL_SECTION SockCritSec; // Protect access to this structure
struct _SOCKET_OBJ *next;
} SOCKET_OBJ;
//
typedef struct _LISTEN_OBJ
{
SOCKET s;
int AddressFamily;
BUFFER_OBJ *PendingAccepts; // Pending AcceptEx buffers
volatile long PendingAcceptCount;
int HiWaterMark, LoWaterMark;
HANDLE AcceptEvent;
HANDLE RepostAccept;
volatile long RepostCount;
// Pointers to Microsoft specific extensions.
LPFN_ACCEPTEX lpfnAcceptEx;
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs;
CRITICAL_SECTION ListenCritSec;
struct _LISTEN_OBJ *next;
} LISTEN_OBJ;
// Serialize access to the free lists below
CRITICAL_SECTION gBufferListCs, gSocketListCs, gPendingCritSec;
// Lookaside lists for free buffers and socket objects
BUFFER_OBJ *gFreeBufferList=NULL;
SOCKET_OBJ *gFreeSocketList=NULL;
BUFFER_OBJ *gPendingSendList=NULL, *gPendingSendListEnd=NULL;
int PostSend(SOCKET_OBJ *sock, BUFFER_OBJ *sendobj);
int PostRecv(SOCKET_OBJ *sock, BUFFER_OBJ *recvobj);
void FreeBufferObj(BUFFER_OBJ *obj);
// Function: usage
// Description: Prints usage information and exits the process.
int usage(char *progname)
{
fprintf(stderr, "Usage: %s [-a 4|6] [-e port] [-l local-addr] [-p udp|tcp]\n", progname);
fprintf(stderr, " -a 4|6 Address family, 4 = IPv4, 6 = IPv6 [default = IPv4]\n"
" else will listen to both IPv4 and IPv6\n"
" -b size Buffer size for send/recv [default = %d]\n"
" -e port Port number [default = %s]\n"
" -l addr Local address to bind to [default INADDR_ANY for IPv4 or INADDR6_ANY for IPv6]\n"
" -oa count Maximum overlapped accepts to allow\n"
" -os count Maximum overlapped sends to allow\n"
" -or count Maximum overlapped receives to allow\n"
" -o count Initial number of overlapped accepts to post\n",
gBufferSize,
gBindPort
);
return 0;
}
// Function: dbgprint
// Description: Prints a message if compiled with the DEBUG flag.
void dbgprint(char *format,...)
{
#ifdef DEBUG
va_list vl;
char dbgbuf[2048];
va_start(vl, format);
wvsprintf(dbgbuf, format, vl);
va_end(vl);
printf(dbgbuf);
OutputDebugString(dbgbuf);
#endif
}
// Function: EnqueuePendingOperation
// Description: Enqueues a buffer object into a list (at the end).
void EnqueuePendingOperation(BUFFER_OBJ **head, BUFFER_OBJ **end, BUFFER_OBJ *obj, int op)
{
EnterCriticalSection(&gPendingCritSec);
if (op == OP_READ)
;
else if (op == OP_WRITE)
InterlockedIncrement(&obj->sock->PendingSend);
obj->next = NULL;
if (*end)
{
(*end)->next = obj;
(*end) = obj;
}
else
{
(*head) = (*end) = obj;
}
LeaveCriticalSection(&gPendingCritSec);
return;
}
// Function: DequeuePendingOperation
// Description: Dequeues the first entry in the list.
BUFFER_OBJ *DequeuePendingOperation(BUFFER_OBJ **head, BUFFER_OBJ **end, int op)
{
BUFFER_OBJ *obj=NULL;
EnterCriticalSection(&gPendingCritSec);
if (*head)
{
obj = *head;
(*head) = obj->next;
// If next is NULL, no more objects are in the queue
if (obj->next == NULL)
{
(*end) = NULL;
}
if (op == OP_READ)
;
else if (op == OP_WRITE)
InterlockedDecrement(&obj->sock->PendingSend);
}
LeaveCriticalSection(&gPendingCritSec);
return obj;
}
// Function: ProcessPendingOperations
// Description:
// This function goes through the list of pending send operations and posts them
// as long as the maximum number of outstanding sends is not exceeded.
void ProcessPendingOperations()
{
BUFFER_OBJ *sendobj=NULL;
while(gOutstandingSends < gMaxSends)
{
sendobj = DequeuePendingOperation(&gPendingSendList, &gPendingSendListEnd, OP_WRITE);
if (sendobj)
{
if (PostSend(sendobj->sock, sendobj) == SOCKET_ERROR)
{
// Cleanup
printf("ProcessPendingOperations: PostSend failed!\n");
FreeBufferObj(sendobj);
break;
}
}
else
{
break;
}
}
return;
}
// Function: InsertPendingAccept
// Description: Inserts a pending accept operation into the listening object.
void InsertPendingAccept(LISTEN_OBJ *listenobj, BUFFER_OBJ *obj)
{
obj->next = NULL;
EnterCriticalSection(&listenobj->ListenCritSec);
if (listenobj->PendingAccepts == NULL)
{
listenobj->PendingAccepts = obj;
}
else
{
// Insert at head - order doesn't really matter
obj->next = listenobj->PendingAccepts;
listenobj->PendingAccepts = obj;
}
LeaveCriticalSection(&listenobj->ListenCritSec);
}
// Function: RemovePendingAccept
// Description:
// Removes the indicated accept buffer object from the list of pending
// accepts in the listening object.
void RemovePendingAccept(LISTEN_OBJ *listenobj, BUFFER_OBJ *obj)
{
BUFFER_OBJ *ptr=NULL, *prev=NULL;
EnterCriticalSection(&listenobj->ListenCritSec);
// Search list until we find the object
ptr = listenobj->PendingAccepts;
while ( (ptr) && (ptr != obj) )
{
prev = ptr;
ptr = ptr->next;
}
if (prev)
{
// Object is somewhere after the first entry
prev->next = obj->next;
}
else
{
// Object is the first entry
listenobj->PendingAccepts = obj->next;
}
LeaveCriticalSection(&listenobj->ListenCritSec);
}
// Function: GetBufferObj
// Description:
// Allocate a BUFFER_OBJ. A look aside list is maintained to increase performance
// as these objects are allocated frequently.
BUFFER_OBJ *GetBufferObj(int buflen)
{
BUFFER_OBJ *newobj=NULL;
EnterCriticalSection(&gBufferListCs);
if (gFreeBufferList == NULL)
{
// Allocate the object
newobj = (BUFFER_OBJ *)HeapAlloc(
GetProcessHeap(),
HEAP_ZERO_MEMORY,
sizeof(BUFFER_OBJ) + (sizeof(BYTE) * buflen)
);
if (newobj == NULL)
{
fprintf(stderr, "GetBufferObj: HeapAlloc failed: %d\n", GetLastError());
}
}
else
{
newobj = gFreeBufferList;
gFreeBufferList = newobj->next;
newobj->next = NULL;
}
LeaveCriticalSection(&gBufferListCs);
if (newobj)
{
newobj->buf = (char *)(((char *)newobj) + sizeof(BUFFER_OBJ));
newobj->buflen = buflen;
newobj->addrlen = sizeof(newobj->addr);
}
return newobj;
}
// Function: FreeBufferObj
// Description: Free the buffer object. This adds the object to the free look aside list.
void FreeBufferObj(BUFFER_OBJ *obj)
{
EnterCriticalSection(&gBufferListCs);
memset(obj, 0, sizeof(BUFFER_OBJ) + gBufferSize);
obj->next = gFreeBufferList;
gFreeBufferList = obj;
LeaveCriticalSection(&gBufferListCs);
}
// Function: GetSocketObj
// Description:
// Allocate a socket object and initialize its members. A socket object is
// allocated for each socket created (either by socket or accept).
// Socket objects are returned from a look aside list if available. Otherwise, a new object is allocated.
SOCKET_OBJ *GetSocketObj(SOCKET s, int af)
{
SOCKET_OBJ *sockobj=NULL;
EnterCriticalSection(&gSocketListCs);
if (gFreeSocketList == NULL)
{
sockobj = (SOCKET_OBJ *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(SOCKET_OBJ));
if (sockobj == NULL)
{
fprintf(stderr, "GetSocketObj: HeapAlloc failed: %d\n", GetLastError());
}
else
{
InitializeCriticalSection(&sockobj->SockCritSec);
}
}
else
{
sockobj = gFreeSocketList;
gFreeSocketList = sockobj->next;
sockobj->next = NULL;
}
LeaveCriticalSection(&gSocketListCs);
// Initialize the members
if (sockobj)
{
sockobj->s = s;
sockobj->af = af;
}
return sockobj;
}
// Function: FreeSocketObj
// Description: Frees a socket object. The object is added to the lookaside list.
void FreeSocketObj(SOCKET_OBJ *obj)
{
CRITICAL_SECTION cstmp;
BUFFER_OBJ *ptr=NULL;
// Close the socket if it hasn't already been closed
if (obj->s != INVALID_SOCKET)
{
printf("FreeSocketObj: closing socket\n");
closesocket(obj->s);
obj->s = INVALID_SOCKET;
}
EnterCriticalSection(&gSocketListCs);
cstmp = obj->SockCritSec;
memset(obj, 0, sizeof(SOCKET_OBJ));
obj->SockCritSec = cstmp;
obj->next = gFreeSocketList;
gFreeSocketList = obj;
LeaveCriticalSection(&gSocketListCs);
}
// Function: ValidateArgs
// Description: Parses the command line arguments and sets up some global variables.
void ValidateArgs(int argc, char **argv)
{
int i;
for(i=1; i < argc ;i++)
{
if (((argv[i][0] != '/') && (argv[i][0] != '-')) || (strlen(argv[i]) < 2))
usage(argv[0]);
else
{
switch (tolower(argv[i][1]))
{
case 'a': // address family - IPv4 or IPv6
if (i+1 >= argc)
usage(argv[0]);
if (argv[i+1][0] == '4')
gAddressFamily = AF_INET;
else if (argv[i+1][0] == '6')
gAddressFamily = AF_INET6;
else
usage(argv[0]);
i++;
break;
case 'b': // buffer size for send/recv
if (i+1 >= argc)
usage(argv[0]);
gBufferSize = atol(argv[++i]);
break;
case 'e': // endpoint - port number
if (i+1 >= argc)
usage(argv[0]);
gBindPort = argv[++i];
break;
case 'l': // local address for binding
if (i+1 >= argc)
usage(argv[0]);
gBindAddr = argv[++i];
break;
case 'o': // overlapped count
if (i+1 >= argc)
usage(argv[0]);
if (strlen(argv[i]) == 2) // Overlapped accept initial count
{
gInitialAccepts = atol(argv[++i]);
}
else if (strlen(argv[i]) == 3)
{
if (tolower(argv[i][2]) == 'a')
gMaxAccepts = atol(argv[++i]);
else if (tolower(argv[i][2]) == 's')
gMaxSends = atol(argv[++i]);
else if (tolower(argv[i][2]) == 'r')
gMaxReceives = atol(argv[++i]);
else
usage(argv[0]);
}
else
{
usage(argv[0]);
}
break;
default:
usage(argv[0]);
break;
}
}
}
}
// Function: PrintStatistics
// Description: Print the send/recv statistics for the server
void PrintStatistics()
{
ULONG bps, tick, elapsed, cps;
tick = GetTickCount();
elapsed = (tick - gStartTime) / 1000;
if (elapsed == 0)
return;
printf("\n");
// Calculate average bytes per second
bps = gBytesSent / elapsed;
printf("Average BPS sent: %lu [%lu]\n", bps, gBytesSent);
bps = gBytesRead / elapsed;
printf("Average BPS read: %lu [%lu]\n", bps, gBytesRead);
elapsed = (tick - gStartTimeLast) / 1000;
if (elapsed == 0)
return;
// Calculate bytes per second over the last X seconds
bps = gBytesSentLast / elapsed;
printf("Current BPS sent: %lu\n", bps);
bps = gBytesReadLast / elapsed;
printf("Current BPS read: %lu\n", bps);
cps = gConnectionsLast / elapsed;
printf("Current conns/sec: %lu\n", cps);
printf("Total connections: %lu\n", gConnections);
InterlockedExchange(&gBytesSentLast, 0);
InterlockedExchange(&gBytesReadLast, 0);
InterlockedExchange(&gConnectionsLast, 0);
gStartTimeLast = tick;
}
// Function: PostRecv
// Description: Post an overlapped receive operation on the socket.
int PostRecv(SOCKET_OBJ *sock, BUFFER_OBJ *recvobj)
{
WSABUF wbuf;
DWORD bytes, flags;
int rc;
recvobj->operation = OP_READ;
wbuf.buf = recvobj->buf;
wbuf.len = recvobj->buflen;
flags = 0;
EnterCriticalSection(&sock->SockCritSec);
rc = WSARecv(sock->s, &wbuf, 1, &bytes, &flags, &recvobj->ol, NULL);
if (rc == SOCKET_ERROR)
{
rc = NO_ERROR;
if (WSAGetLastError() != WSA_IO_PENDING)
{
dbgprint("PostRecv: WSARecv* failed: %d\n", WSAGetLastError());
rc = SOCKET_ERROR;
}
}
if (rc == NO_ERROR)
{
// Increment outstanding overlapped operations
InterlockedIncrement(&sock->OutstandingRecv);
}
LeaveCriticalSection(&sock->SockCritSec);
return rc;
}
// Function: PostSend
// Description: Post an overlapped send operation on the socket.
int PostSend(SOCKET_OBJ *sock, BUFFER_OBJ *sendobj)
{
WSABUF wbuf;
DWORD bytes;
int rc, err;
sendobj->operation = OP_WRITE;
wbuf.buf = sendobj->buf;
wbuf.len = sendobj->buflen;
EnterCriticalSection(&sock->SockCritSec);
rc = WSASend(sock->s, &wbuf, 1, &bytes, 0, &sendobj->ol, NULL);
if (rc == SOCKET_ERROR)
{
rc = NO_ERROR;
if ((err = WSAGetLastError()) != WSA_IO_PENDING)
{
if (err == WSAENOBUFS)
DebugBreak();
dbgprint("PostSend: WSASend* failed: %d [internal = %d]\n", WSAGetLastError(), sendobj->ol.Internal);
rc = SOCKET_ERROR;
}
}
if (rc == NO_ERROR)
{
// Increment the outstanding operation count
InterlockedIncrement(&sock->OutstandingSend);
InterlockedIncrement(&gOutstandingSends);
} LeaveCriticalSection(&sock->SockCritSec);
return rc;
}
// Function: PostAccept
// Description: Post an overlapped accept on a listening socket.
int PostAccept(LISTEN_OBJ *listen, BUFFER_OBJ *acceptobj)
{
DWORD bytes;
int rc;
acceptobj->operation = OP_ACCEPT;
// Create the client socket for an incoming connection
acceptobj->sclient = socket(listen->AddressFamily, SOCK_STREAM, IPPROTO_TCP);
if (acceptobj->sclient == INVALID_SOCKET)
{
fprintf(stderr, "PostAccept: socket failed: %d\n", WSAGetLastError());
return -1;
}
rc = listen->lpfnAcceptEx(
listen->s,
acceptobj->sclient,
acceptobj->buf,
acceptobj->buflen - ((sizeof(SOCKADDR_STORAGE) + 16) * 2),
sizeof(SOCKADDR_STORAGE) + 16,
sizeof(SOCKADDR_STORAGE) + 16,
&bytes,
&acceptobj->ol
);
if (rc == FALSE)
{
if (WSAGetLastError() != WSA_IO_PENDING)
{
printf("PostAccept: AcceptEx failed: %d\n", WSAGetLastError());
return SOCKET_ERROR;
}
}
// Increment the outstanding overlapped count for this socket
InterlockedIncrement(&listen->PendingAcceptCount);
return NO_ERROR;
}
// Function: HandleIo
// Description:
// This function handles the IO on a socket. In the event of a receive, the
// completed receive is posted again. For completed accepts, another AcceptEx
// is posted. For completed sends, the buffer is freed.
void HandleIo(ULONG_PTR key, BUFFER_OBJ *buf, HANDLE CompPort, DWORD BytesTransfered, DWORD error)
{
LISTEN_OBJ *listenobj=NULL;
SOCKET_OBJ *sockobj=NULL,
*clientobj=NULL; // New client object for accepted connections
BUFFER_OBJ *recvobj=NULL, // Used to post new receives on accepted connections
*sendobj=NULL; // Used to post new sends for data received
BOOL bCleanupSocket;
if (error != 0)
{
dbgprint("OP = %d; Error = %d\n", buf->operation, error);
}
bCleanupSocket = FALSE;
if (error != NO_ERROR)
{
// An error occurred on a TCP socket, free the associated per I/O buffer
// and see if there are any more outstanding operations. If so we must
// wait until they are complete as well.
if (buf->operation != OP_ACCEPT)
{
sockobj = (SOCKET_OBJ *)key;
if (buf->operation == OP_READ)
{
if ((InterlockedDecrement(&sockobj->OutstandingRecv) == 0) && (sockobj->OutstandingSend == 0) )
{
dbgprint("Freeing socket obj in GetOverlappedResult\n");
FreeSocketObj(sockobj);
}
}
else if (buf->operation == OP_WRITE)
{
if ((InterlockedDecrement(&sockobj->OutstandingSend) == 0) && (sockobj->OutstandingRecv == 0) )
{
dbgprint("Freeing socket obj in GetOverlappedResult\n");
FreeSocketObj(sockobj);
}
}
}
else
{
listenobj = (LISTEN_OBJ *)key;
printf("Accept failed\n");
closesocket(buf->sclient);
buf->sclient = INVALID_SOCKET;
}
FreeBufferObj(buf);
return;
}
if (buf->operation == OP_ACCEPT)
{
HANDLE hrc;
SOCKADDR_STORAGE *LocalSockaddr=NULL, *RemoteSockaddr=NULL;
int LocalSockaddrLen,RemoteSockaddrLen;
listenobj = (LISTEN_OBJ *)key;
// Update counters
InterlockedIncrement(&gConnections);
InterlockedIncrement(&gConnectionsLast);
InterlockedDecrement(&listenobj->PendingAcceptCount);
InterlockedExchangeAdd(&gBytesRead, BytesTransfered);
InterlockedExchangeAdd(&gBytesReadLast, BytesTransfered);
// Print the client's addresses
listenobj->lpfnGetAcceptExSockaddrs(
buf->buf,
buf->buflen - ((sizeof(SOCKADDR_STORAGE) + 16) * 2),
sizeof(SOCKADDR_STORAGE) + 16,
sizeof(SOCKADDR_STORAGE) + 16,
(SOCKADDR **)&LocalSockaddr,
&LocalSockaddrLen,
(SOCKADDR **)&RemoteSockaddr,
&RemoteSockaddrLen
);
RemovePendingAccept(listenobj, buf);
// Get a new SOCKET_OBJ for the client connection
clientobj = GetSocketObj(buf->sclient, listenobj->AddressFamily);
if (clientobj)
{
// Associate the new connection to our completion port
hrc = CreateIoCompletionPort((HANDLE)clientobj->s, CompPort, (ULONG_PTR)clientobj, 0);
if (hrc == NULL)
{
fprintf(stderr, "CompletionThread: CreateIoCompletionPort failed: %d\n", GetLastError());
return;
}
sendobj = buf;
sendobj->buflen = BytesTransfered;
// Post the send - this is the first one for this connection so just do it
sendobj->sock = clientobj;
// PostSend(clientobj, sendobj);
EnqueuePendingOperation(&gPendingSendList, &gPendingSendListEnd, sendobj, OP_WRITE);
}
else
{
// Can't allocate a socket structure so close the connection
closesocket(buf->sclient);
buf->sclient = INVALID_SOCKET;
FreeBufferObj(buf);
}
if (error != NO_ERROR)
{
// Check for socket closure
EnterCriticalSection(&clientobj->SockCritSec);
if ( (clientobj->OutstandingSend == 0) && (clientobj->OutstandingRecv == 0) )
{
closesocket(clientobj->s);
clientobj->s = INVALID_SOCKET;
FreeSocketObj(clientobj);
}
else
{
clientobj->bClosing = TRUE;
}
LeaveCriticalSection(&clientobj->SockCritSec);
error = NO_ERROR;
}
InterlockedIncrement(&listenobj->RepostCount);
SetEvent(listenobj->RepostAccept);
}
else if (buf->operation == OP_READ)
{
sockobj = (SOCKET_OBJ *)key;
InterlockedDecrement(&sockobj->OutstandingRecv);
// Receive completed successfully
if (BytesTransfered > 0)
{
InterlockedExchangeAdd(&gBytesRead, BytesTransfered);
InterlockedExchangeAdd(&gBytesReadLast, BytesTransfered);
// Make the recv a send
sendobj = buf;
sendobj->buflen = BytesTransfered;
sendobj->sock = sockobj;
//PostSend(sockobj, sendobj);
EnqueuePendingOperation(&gPendingSendList, &gPendingSendListEnd, sendobj, OP_WRITE);
}
else
{
// dbgprint("Got 0 byte receive\n");
// Graceful close - the receive returned 0 bytes read
sockobj->bClosing = TRUE;
// Free the receive buffer
FreeBufferObj(buf);
// If this was the last outstanding operation on socket, clean it up
EnterCriticalSection(&sockobj->SockCritSec);
if ((sockobj->OutstandingSend == 0) && (sockobj->OutstandingRecv == 0) )
{
bCleanupSocket = TRUE;
}
LeaveCriticalSection(&sockobj->SockCritSec);
}
}
else if (buf->operation == OP_WRITE)
{
sockobj = (SOCKET_OBJ *)key;
InterlockedDecrement(&sockobj->OutstandingSend);
InterlockedDecrement(&gOutstandingSends);
// Update the counters
InterlockedExchangeAdd(&gBytesSent, BytesTransfered);
InterlockedExchangeAdd(&gBytesSentLast, BytesTransfered);
buf->buflen = gBufferSize;
if (sockobj->bClosing == FALSE)
{
buf->sock = sockobj;
PostRecv(sockobj, buf);
}
}
ProcessPendingOperations();
if (sockobj)
{
if (error != NO_ERROR)
{
printf("err = %d\n", error);
sockobj->bClosing = TRUE;
}
// Check to see if socket is closing
if ( (sockobj->OutstandingSend == 0) && (sockobj->OutstandingRecv == 0) && (sockobj->bClosing) )
{
bCleanupSocket = TRUE;
}
if (bCleanupSocket)
{
closesocket(sockobj->s);
sockobj->s = INVALID_SOCKET;
FreeSocketObj(sockobj);
}
}
return;
}
// Function: CompletionThread
// Description:
// This is the completion thread which services our completion port. One of
// these threads is created per processor on the system. The thread sits in
// an infinite loop calling GetQueuedCompletionStatus and handling socket IO that completed.
DWORD WINAPI CompletionThread(LPVOID lpParam)
{
ULONG_PTR Key;
SOCKET s;
BUFFER_OBJ *bufobj=NULL; // Per I/O object for completed I/O
OVERLAPPED *lpOverlapped=NULL; // Pointer to overlapped structure for completed I/O
HANDLE CompletionPort; // Completion port handle
DWORD BytesTransfered, // Number of bytes transferred
Flags; // Flags for completed I/O
int rc, error;
CompletionPort = (HANDLE)lpParam;
while (1)
{
error = NO_ERROR;
rc = GetQueuedCompletionStatus(CompletionPort, &BytesTransfered, (PULONG_PTR)&Key, &lpOverlapped, INFINITE);
bufobj = CONTAINING_RECORD(lpOverlapped, BUFFER_OBJ, ol);
if (rc == FALSE)
{
// If the call fails, call WSAGetOverlappedResult to translate the
// error code into a Winsock error code.
if (bufobj->operation == OP_ACCEPT)
{
s = ((LISTEN_OBJ *)Key)->s;
}
else
{
s = ((SOCKET_OBJ *)Key)->s;
}
dbgprint("CompletionThread: GetQueuedCompletionStatus failed: %d [0x%x]\n", GetLastError(), lpOverlapped->Internal);
rc = WSAGetOverlappedResult(s, &bufobj->ol, &BytesTransfered, FALSE, &Flags);
if (rc == FALSE)
{
error = WSAGetLastError();
}
}
// Handle the IO operation
HandleIo(Key, bufobj, CompletionPort, BytesTransfered, error);
}
ExitThread(0);
return 0;
}
// Function: main
// Description:
// This is the main program. It parses the command line and creates
// the main socket. For TCP the socket is used to accept incoming
// client connections. Each client TCP connection is handed off to
// a worker thread which will receive any data on that connection
// until the connection is closed.
int main(int argc, char **argv)
{
WSADATA wsd;
SYSTEM_INFO sysinfo;
LISTEN_OBJ *ListenSockets=NULL, *listenobj=NULL;
SOCKET_OBJ *sockobj=NULL;
BUFFER_OBJ *acceptobj=NULL;
GUID guidAcceptEx = WSAID_ACCEPTEX, guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
DWORD bytes;
HANDLE CompletionPort, WaitEvents[MAX_COMPLETION_THREAD_COUNT], hrc;
int endpointcount=0, waitcount=0, interval, rc, i;
struct addrinfo *res=NULL, *ptr=NULL;
if(argc < 2)
{
usage(argv[0]);
exit(1);
}
// Validate the command line
ValidateArgs(argc, argv);
// Load Winsock
if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
{
fprintf(stderr, "unable to load Winsock!\n");
return -1;
}
InitializeCriticalSection(&gSocketListCs);
InitializeCriticalSection(&gBufferListCs);
InitializeCriticalSection(&gPendingCritSec);
// Create the completion port used by this server
CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
if (CompletionPort == NULL)
{
fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
return -1;
}
// Find out how many processors are on this system
GetSystemInfo(&sysinfo);
if (sysinfo.dwNumberOfProcessors > MAX_COMPLETION_THREAD_COUNT)
{
sysinfo.dwNumberOfProcessors = MAX_COMPLETION_THREAD_COUNT;
}
// Round the buffer size to the next increment of the page size
if ((gBufferSize % sysinfo.dwPageSize) != 0)
{
gBufferSize = ((gBufferSize / sysinfo.dwPageSize) + 1) * sysinfo.dwPageSize;
}
printf("Buffer size = %lu (page size = %lu)\n", gBufferSize, sysinfo.dwPageSize);
// Create the worker threads to service the completion notifications
for(waitcount=0; waitcount < (int)sysinfo.dwNumberOfProcessors ;waitcount++)
{
WaitEvents[waitcount] = CreateThread(NULL, 0, CompletionThread, (LPVOID)CompletionPort, 0, NULL);
if (WaitEvents[waitcount] == NULL)
{
fprintf(stderr, "CreatThread failed: %d\n", GetLastError());
return -1;
}
}
printf("Local address: %s; Port: %s; Family: %d\n", gBindAddr, gBindPort, gAddressFamily);
// Obtain the "wildcard" addresses for all the available address families
res = ResolveAddress(gBindAddr, gBindPort, gAddressFamily, gSocketType, gProtocol);
if (res == NULL)
{
fprintf(stderr, "ResolveAddress failed to return any addresses!\n");
return -1;
}
// For each local address returned, create a listening/receiving socket
ptr = res;
while (ptr)
{
printf("Listening address: ");
PrintAddress(ptr->ai_addr, ptr->ai_addrlen);
printf("\n");
listenobj = (LISTEN_OBJ *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(LISTEN_OBJ));
if (listenobj == NULL)
{
fprintf(stderr, "Out of memory!\n");
return -1;
}
listenobj->LoWaterMark = gInitialAccepts;
InitializeCriticalSection(&listenobj->ListenCritSec);
// Save off the address family of this socket
listenobj->AddressFamily = ptr->ai_family;
// create the socket
listenobj->s = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
if (listenobj->s == INVALID_SOCKET)
{
fprintf(stderr, "socket failed: %d\n", WSAGetLastError());
return -1;
}
// Create an event to register for FD_ACCEPT events on
listenobj->AcceptEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
if (listenobj->AcceptEvent == NULL)
{
fprintf(stderr, "CreateEvent failed: %d\n", GetLastError());
return -1;
}
listenobj->RepostAccept = CreateEvent(NULL, TRUE, FALSE, NULL);
if (listenobj->RepostAccept == NULL)
{
fprintf(stderr, "CreateSemaphore failed: %d\n", GetLastError());
return -1;
}
// Add the event to the list of waiting events
WaitEvents[waitcount++] = listenobj->AcceptEvent;
WaitEvents[waitcount++] = listenobj->RepostAccept;
// Associate the socket and its SOCKET_OBJ to the completion port
hrc = CreateIoCompletionPort((HANDLE)listenobj->s, CompletionPort, (ULONG_PTR)listenobj, 0);
if (hrc == NULL)
{
fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
return -1;
}
// bind the socket to a local address and port
rc = bind(listenobj->s, ptr->ai_addr, ptr->ai_addrlen);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "bind failed: %d\n", WSAGetLastError());
return -1;
}
// Need to load the Winsock extension functions from each provider
// -- e.g. AF_INET and AF_INET6.
rc = WSAIoctl(
listenobj->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx,
sizeof(guidAcceptEx),
&listenobj->lpfnAcceptEx,
sizeof(listenobj->lpfnAcceptEx),
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER failed: %d\n", WSAGetLastError());
return -1;
}
// Load the Winsock extensions from each provider
rc = WSAIoctl(
listenobj->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidGetAcceptExSockaddrs,
sizeof(guidGetAcceptExSockaddrs),
&listenobj->lpfnGetAcceptExSockaddrs,
sizeof(listenobj->lpfnGetAcceptExSockaddrs),
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER failed: %d\n", WSAGetLastError());
return -1;
}
// Put the socket into listening mode
rc = listen(listenobj->s, 200);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "listen failed: %d\n", WSAGetLastError());
return -1;
}
// Register for FD_ACCEPT notification on listening socket
rc = WSAEventSelect(listenobj->s, listenobj->AcceptEvent, FD_ACCEPT);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAEventSelect failed: %d\n", WSAGetLastError());
return -1;
}
// Initiate the initial accepts for each listen socket
for(i=0; i < gInitialAccepts ;i++)
{
acceptobj = GetBufferObj(gBufferSize);
if (acceptobj == NULL)
{
fprintf(stderr, "Out of memory!\n");
return -1;
}
acceptobj->PostAccept = listenobj->AcceptEvent;
InsertPendingAccept(listenobj, acceptobj);
PostAccept(listenobj, acceptobj);
}
// Maintain a list of the listening socket structures
if (ListenSockets == NULL)
{
ListenSockets = listenobj;
}
else
{
listenobj->next = ListenSockets;
ListenSockets = listenobj;
}
endpointcount++;
ptr = ptr->ai_next;
}
// free the addrinfo structure for the 'bind' address
freeaddrinfo(res);
gStartTime = gStartTimeLast = GetTickCount();
interval = 0;
while (1)
{
rc = WSAWaitForMultipleEvents(waitcount, WaitEvents, FALSE, 5000, FALSE);
if (rc == WAIT_FAILED)
{
fprintf(stderr, "WSAWaitForMultipleEvents failed: %d\n", WSAGetLastError());
break;
}
else if (rc == WAIT_TIMEOUT)
{
interval++;
PrintStatistics();
if (interval == 36)
{
int optval, optlen;
// For TCP, cycle through all the outstanding AcceptEx operations
// to see if any of the client sockets have been connected but
// haven't received any data. If so, close them as they could be
// a denial of service attack.
listenobj = ListenSockets;
while (listenobj)
{
EnterCriticalSection(&listenobj->ListenCritSec);
acceptobj = listenobj->PendingAccepts;
while (acceptobj)
{
optlen = sizeof(optval);
rc = getsockopt( acceptobj->sclient, SOL_SOCKET, SO_CONNECT_TIME, (char *)&optval, &optlen);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "getsockopt: SO_CONNECT_TIME failed: %d\n", WSAGetLastError());
}
else
{
// If the socket has been connected for more than 5 minutes,
// close it. If closed, the AcceptEx call will fail in the completion thread.
if ((optval != 0xFFFFFFFF) && (optval > 300))
{
printf("closing stale handle\n");
closesocket(acceptobj->sclient);
acceptobj->sclient = INVALID_SOCKET;
}
}
acceptobj = acceptobj->next;
}
LeaveCriticalSection(&listenobj->ListenCritSec);
listenobj = listenobj->next;
}
interval = 0;
}
}
else
{
int index;
index = rc - WAIT_OBJECT_0;
for( ; index < waitcount ; index++)
{
rc = WaitForSingleObject(WaitEvents[index], 0);
if (rc == WAIT_FAILED || rc == WAIT_TIMEOUT)
{
continue;
}
if (index < (int)sysinfo.dwNumberOfProcessors)
{
// One of the completion threads exited
// This is bad so just bail - a real server would want
// to gracefully exit but this is just a sample ...
ExitProcess(-1);
}
else
{
// An FD_ACCEPT event occurred
listenobj = ListenSockets;
while (listenobj)
{
if ((listenobj->AcceptEvent == WaitEvents[index]) ||
(listenobj->RepostAccept == WaitEvents[index]))
break;
listenobj = listenobj->next;
}
if (listenobj)
{
WSANETWORKEVENTS ne;
int limit=0;
if (listenobj->AcceptEvent == WaitEvents[index])
{
// EnumNetworkEvents to see if FD_ACCEPT was set
rc = WSAEnumNetworkEvents(listenobj->s, listenobj->AcceptEvent,&ne);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAEnumNetworkEvents failed: %d\n", WSAGetLastError());
}
if ((ne.lNetworkEvents & FD_ACCEPT) == FD_ACCEPT)
{
// We got an FD_ACCEPT so post multiple accepts to cover the burst
limit = BURST_ACCEPT_COUNT;
}
}
else if (listenobj->RepostAccept == WaitEvents[index])
{
// Semaphore is signaled
limit = InterlockedExchange(&listenobj->RepostCount, 0);
ResetEvent(listenobj->RepostAccept);
}
i = 0;
while ( (i++ < limit) && (listenobj->PendingAcceptCount < gMaxAccepts) )
{
acceptobj = GetBufferObj(gBufferSize);
if (acceptobj)
{
acceptobj->PostAccept = listenobj->AcceptEvent;
InsertPendingAccept(listenobj, acceptobj);
PostAccept(listenobj, acceptobj);
}
}
}
}
}
}
}
WSACleanup();
return 0;
}
< Performance, Scalability & Winsock2 APIs | Scalability Main | Continue I/O Port Completion...(Adding header file) >