WinSock I/O 模型 -- IOCP 模型

Oct 15, 2021

目录


前言


IOCP 全称 Input/Ouput Completion Ports,中文中翻译一般为“完成端口”,本文中我们使用 IOCP 简写.

IOCP 模型是迄今为止最为复杂的一种 I/O 模型,但是同时通过使用 IOCP 我们往往可以达到最佳的系统性能. 当你的网络应用程序需要管理大量的 Socket I/O 请求时,你或许没有其他的选择.

本篇文章,我们将通过一个官方的 IOCP demo 程序来介绍如何使用 IOCP. 因为其复杂性,这篇文章中我们主要介绍如何使用,不深入内部的实现,更多的详细信息,请参考官方文档.

官方程序的地址:

https://github.com/microsoft/Windows-classic-samples/tree/master/Samples/Win7Samples/netds/winsock/iocp/serverex

个人感觉官方的 demo 代码不太好看(包括格式,和一些额外琐碎的可省略的细节),因此,文末我会附上自己精简过的代码,以便读者阅读. 读者按需自取.


API 基础


关于我们将要使用的数据结构:

  • OVERLAPPED 结构体
  • WSAEvent
  • CriticalSection
  • CreateThread

等相关知识,在 WinSocket I/O 模型的相关文章 WinSock I/O 模型 – OVERLAPPED I/O 模型 中均已介绍过,这里不在赘述.

CreateIoCompletionPort

CreateIoCompletionPort 方法用于创建一个 IOCP handle 或者将现有的 Socket handle 与已经创建的 IOCP 关联起来.

1HANDLE WINAPI CreateIoCompletionPort(
2  _In_     HANDLE    FileHandle,
3  _In_opt_ HANDLE    ExistingCompletionPort,
4  _In_     ULONG_PTR CompletionKey,
5  _In_     DWORD     NumberOfConcurrentThreads
6);
  1. FileHandle: 指定与 ExistingCompletionPort 关联的文件 handle(注意不仅仅是 socket handle)。 这个 fileHandle 必须支持 overlapped I/O。 对于 Socket handle 来说,该 socket 在创建时需要指定 WSA_FLAG_OVERLAPPED 标志. 当我们想要使用这个 API 来创建一个新的 IOCP handle 时, 将这个参数设置为 INVALID_HANDLE_VALUE.
  2. ExistingCompletionPort: NULL 或者一个已经使用 CreateIoCompletionPort 创建出来的 IOCP 实例. 当我们想要使用这个 API 来创建一个新的 IOCP handle 时, 将这个参数设置为 NULL. 此时,该方法的返回值是新创建出来的 IOCP 实例。 当我们想要将一个 IOCP 实例与一个 FileHandle 关联以来的时候,将这个参数设置为当前已经存在的 IOCP 实例. 此时当此次方法调用成功的时候,该方法返回是 ExistingCompletionPort.
  3. CompletionKey: 此参数用于指定一个与当前 FileHandle 关联的数据结构,这个结构将包含在每个 I/O completion packet (后面我们会解释该packet) 中.
  4. NumberOfConcurrentThreads: 指定最大允许的线程数, 这些线程用于处理 I/O completion packet。 这个参数仅仅在创建新的 IOCP 实例时有用,其他情况下会被忽略. 当指定为 0, 系统将使用等同与当前系统 CPU 处理器数量的线程.

GetQueuedCompletionStatus

GetQueuedCompletionStatus 方法用于从指定的 IOCP 实例上获取 I/O completion packet.

I/O completion packet:通缩来讲,当我们创建一个 IOCP实例之后,系统内部会给对应的 IOCP 实例分配一个队列,这个队列用户保存所有与当前 IOCP 关联起来的 FileHandle 上已经完成的异步任务的信息。我们将这样的保存这个队列中的已完成的异步任务的信息称作 I/O completion packet.

使用这个 API 可以从该队列中取出这些 I/O completion packet. 注意这是一个队列, 意味着即使有多个线程同时从一个 IOCP 实例上获取 I/O completion packet 时,他们也不会获取到相同的 I/O completion packet,

还有一个更高级的方法: GetQueuedCompletionStatusEx,这里我们没有使用它,暂且不提.

1BOOL GetQueuedCompletionStatus(
2  HANDLE       CompletionPort,
3  LPDWORD      lpNumberOfBytesTransferred,
4  PULONG_PTR   lpCompletionKey,
5  LPOVERLAPPED *lpOverlapped,
6  DWORD        dwMilliseconds
7);
  1. CompletionPort: IOCP 实例
  2. lpNumberOfBytesTransferred:当前已完成的异步任务成功传输的字节数. 如果当前异步任务是一个发送操作,这个这个参数返回成功发送的字节数。读操作同理.
  3. lpCompletionKey: 我们在将一个 FileHandle 和 IOCP实例关联起来时指定了一个 lpCompletionKey,在这个 FileHandle 上有任务完成,我们通过GetQueuedCompletionStatus 获取到该任务完成的 I/O completion packet 时,这个参数便等于我们指定的那个 lpCompletionKey.
  4. lpOverlapped: 提交异步任务给 IOCP 实例时所指定的 OVERLAPPED 结构体. 我们之前说过,OVERLAPPED 数据结构就像是一个异步任务的id,我们在开始一个异步任务的时候需要指定一个 OVERLAPPED结构体,当这个异步任务完成时,操作系统便可以通过返回这个 OVERLAPPED 结构体给我们,这样我们便能得知是我们提交的哪个异步任务完成了. 关于这个数据结构的使用,还有一些技巧,我们后边再解释.
  5. dwMilliseconds: 指定一个超时时间,在指定时间内没有获取到任何 I/O completion packet,该方法将会返回, 此时该方法返回 FALSE. 实例中,我们将使用 INFINITE 来让这个方法一直阻塞,直到有至少一个任务完成.

返回值:

当该方法成功的获取到一个 I/O completion packet 时,该方法会返回 TRUE。 此时,lpNumberOfBytes,lpOverlapped, lpCompletionKey 会被填充上与当前 I/O completion packet 对应的数据结构.

当该方法调用失败时,该方法会返回 FALSE。此时 lpNumberOfBytes,lpOverlapped, lpCompletionKey 的可能返回值如下:

  • lpOverlapped 返回参数是 NULL, 代表我们没有从 IOCP 实例上获取到任何异步任务的完成信息. lpNumberOfBytes, lpCompletionKey 也不包含任何有效信息.
  • lpOverlapped 返回参数不为 NULL, 代表我们从 IOCP 实例上获取到了异步任务的信息. 这种情况下,该异步任务发生了错误, lpNumberOfBytes,lpOverlapped, lpCompletionKey 返回参数上保存这个失败的任务的信息。 详细的错误信息需要使用 GetLastError.来获取.

当该方法返回 FALSE,且 lpOverlapped 是 NULL, GetLastError 返回 ERROR_ABANDONED_WAIT_0, 代表当前 IOCP 实例被关闭.

HasOverlappedIoCompleted

HasOverlappedIoCompleted 是一个宏,这个宏用来查询在当前 IOCP 实例上是否有正在执行的异步任务.

1void HasOverlappedIoCompleted(
2   lpOverlapped
3);

lpOverlapped 返回参数表示当前处于 Pending 状态的异步任务所关联的 OVERLAPPED 结构体.

如果你的异步任务不处于 ERROR_IO_PENDING, 在这种情况下,不要使用该宏

我们已经直到如何创建一个 IOCP 实例,以及如何得到异步任务完成的通知,我们接下来看看如何提交一个异步任务。

注意,我们将只关注这些 API 与 IOCP 搭配使用,不再提及他们支持的其他操作.

AcceptEx

AcceptEx 方法用来接收新连接.

 1BOOL AcceptEx(
 2  SOCKET       sListenSocket,
 3  SOCKET       sAcceptSocket,
 4  PVOID        lpOutputBuffer,
 5  DWORD        dwReceiveDataLength,
 6  DWORD        dwLocalAddressLength,
 7  DWORD        dwRemoteAddressLength,
 8  LPDWORD      lpdwBytesReceived,
 9  LPOVERLAPPED lpOverlapped
10);
  1. sAcceptSocket: 不同与 accept 方法,因为我们异步的接收新连接,因此,在调用此方法之前,我们需要创建一个 Socket Handle 来保存新接收到的 Socket 实例.
  2. lpOutputBuffer: 该方法支持在接收连接的同时,解析该新socket的本地和远程地址,同时接收一块数据。接收到的数据会从该buffer 的开始位置,地址相关的数据紧跟这个接收到的数据.
  3. dwReceiveDataLength: 用于指定我们用来期待接收到的第一块儿数据的长度. 当该参数为 0 时,意味着我们不接收数据,只接收新的连接. 此时, lpOutputBuffer 仅仅用来保存本地和远程地址。
  4. dwLocalAddressLength, dwRemoteAddressLength: 指定需要为保存本地/远程地址应该在 lpOutputBuffer 中保留的地址。 该参数至少为 16,不能为 0.
  5. lpdwBytesReceived: 返回我们接收到的第一块儿数据的长度. 这个参数仅仅在 AcceptEx 方法立马成功的情况下有效,如果当前接收操作返回 ERROR_IO_PENDING 错误,该返回值无效.
  6. lpOverlapped:指定与当前异步接收操作关联的 OVERLAPPED 结构体.

返回值:

  • 当该方法调用立马成功时,该方法返回 TRUE.
  • 当该方法没有立马成功时,该方法返回 FALSE。 此时应该使用 WSAGetLastError 获取具体的错误信息. 如果 WSAGetLastError 返回 ERROR_IO_PENDING,代表该接收任务已经提交成功,当前正在进行中.

值得一提的是: 官方文档中明确表明,该方法的性能远远高于 accept 方法。

WSARecv

WSARecv 用于从一个处于连接状态的 Socket 上接收数据.

1int WSAAPI WSARecv(
2  SOCKET                             s,
3  LPWSABUF                           lpBuffers,
4  DWORD                              dwBufferCount,
5  LPDWORD                            lpNumberOfBytesRecvd,
6  LPDWORD                            lpFlags,
7  LPWSAOVERLAPPED                    lpOverlapped,
8  LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
9);

这里的 lpOverlapped 参数同 AcceptEx 方法中的 lpOverlapped 参数.

dwBuffers 用于指定一个用于保存接收到的数据的 buffer的数组。 dwBufferCount 指定 buffer 数组中的 buffer 数量。 lpNumberOfBytesRecvd:如果当前读操作立马完成,这个参数用于保存接收到的数据长度. 如果当前任务没有立即完成,而是处于 pending状态,那个这个参数的值无效. lpCompletionRoutine: 本例中,我们不适用这个参数,因此指定为空。 我们使用 GetQueuedCompletionStatus 方法来异步的获取该接收任务完成的通知.

WSASend

WSASend 用于从一个处于连接状态的 Socket 上发送数据.

1int WSAAPI WSASend(
2  SOCKET                             s,
3  LPWSABUF                           lpBuffers,
4  DWORD                              dwBufferCount,
5  LPDWORD                            lpNumberOfBytesSent,
6  DWORD                              dwFlags,
7  LPWSAOVERLAPPED                    lpOverlapped,
8  LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
9);

这个方法几乎和 WSARecv 相同,不再赘述。


实现思路


  1. 创建一个 socket 作为监听 socket
  2. 创建 IOCP 实例,并将 server socket 和 IOCP 实例关联起来
  3. 使用 AcceptEx 提交异步 accept 任务。
  4. 创建多个子线程, 在子线程中使用 GetQueuedCompletionStatus 阻塞的等待异步任务完成的通知(I/O completion packet)。并处理该通知。
  5. 主线程一直阻塞,直到服务器退出, IOCP 实例关闭.

这个流程说起来是非常简单,但是简单的流程中隐藏了极多的细节,这里我们来详细描述一下我们这个 IOCP服务器的实现思路:

  1. 首先,在我们创建了 server socket 之后,我们紧接着就需要创建对应的 IOCP实例(使用 CreateIoCompletePort)。同时将 server socket 与 IOCP 实例关联起来(使用 CreateIoCompletePort)。 在关联当前 server socket 实例的同时,我们需要指定一个 lpCompletionKey。我们需要在这个 lpCompletionKey 结构中存入足够多的信息,以便我们在收到该 server socket 上异步任务完成通知时,做出相应操作时有足够的信息.

这里,我们看看实例代码中作为 lpCompletionKey 的结构是什么样子的:

1typedef struct _PER_SOCKET_CONTEXT {
2	SOCKET                      Socket;
3
4	LPFN_ACCEPTEX               fnAcceptEx;
5
6	PPER_IO_CONTEXT             pIOContext;  
7	struct _PER_SOCKET_CONTEXT  *pCtxtBack; 
8	struct _PER_SOCKET_CONTEXT  *pCtxtForward;
9} PER_SOCKET_CONTEXT, *PPER_SOCKET_CONTEXT;

Socket 字段: 当一个任务完成时,我们需要直到是哪个 socket 上的任务完成了,而 GetQueuedCompletionStatus 的返回值中并没有这个信息,因此我们需要自己保存。

fnAcceptEx: 这个字段的存在是因为 AcceptEx 方法的特殊性决定的。 我们无法直接调用 AcceptEx 方法,而是需要先通过 WSAIoctl 搭配 SIO_GET_EXTENSION_FUNCTION_POINTER 这个参数来动态的获取该方法的指针。 并且该方法指针是和对应的 Server socket 绑定的,也就是如果你有多个 server socket,那么这个函数指针也会有多个。 因此,这个字段不得不存储起来

pIOContext:这个字段用于保存在当前 socket 上执行异步任务需要使用的 Overlapped 结构体的数据。 (接下来,我们会更加详细来说这个结构) pCtxBack 和 pCtxForward:这个真的不是必须的,如果你使用其他方式维护多个 _PER_SOCKET_CONTEXT 数据结构,那个两个字段完全不需要.

  1. 在将 Server socket 和 IOCP 绑定之后,我们需要启用其他线程使用 GetQueuedCompletionStatus 来处理完成的异步任务。这里需要斟酌的点是? 我们需要使用几个线程,这些线程是应该的阻塞的等待还是使用 timeout 来一轮询的方式等待,这需要读者自己好好斟酌。

  2. 将 server socket 和 IOCP 实例关联起来之后, 处理任务完成通知的线程也有了,我们如何让 server socket 开始接收新的连接呢 ?使用 accept ? 不,这里我们不是用它,它是阻塞的方式,这里我们用 AcceptEx 来异步的接收新连接。 那么我们如何做呢?

要使用 AcceptEx,非常重要的一点是,我们得先有个 Overlapped 结构体. 直接创建一个 Overlapped 结构体实例使用好不好? 也不能说不好,但是就目前看到的 IOCP 实现中,没有人这样玩儿(本人看过两个 IOCP 的实现,不包括微软的官方demo,报错 libuv)。

目前,他们使用的方法都是将 Overlapped 数据结构包进另外一个结构体。 demo 中的结构体如下:

 1typedef struct _PER_IO_CONTEXT {
 2	WSAOVERLAPPED               Overlapped;
 3	char                        Buffer[MAX_BUFF_SIZE];
 4	WSABUF                      wsabuf;
 5	int                         nTotalBytes;
 6	int                         nSentBytes;
 7	IO_OPERATION                IOOperation;
 8	SOCKET                      SocketAccept; 
 9
10	struct _PER_IO_CONTEXT      *pIOContextForward;
11} PER_IO_CONTEXT, *PPER_IO_CONTEXT;

注意,这个 _PER_IO_CONTEXT 包含在 _PER_SOCKET_CONTEXT(也就是我们 lpCompletionKey) 这个结构体中。

Overlapped: 这个字段自然是必须存在的.

IOOperation: 指明我们当前异步任务的类型,它的类型 IO_OPERATION: accept, send, read

SocketAccept: 如果我们当前异步任务是一个 accept 任务,那个这个字段用来存储我们新接收到的 socket 实例

wsaBuf: 这个字段是我们提交读或者写任务是需要传给 WSARecv 或 WSARead 的一个数据结构。

Buffer 是我们真正用来存储数据的地方。 WSABuf 这个结构中只包含一个 buffer 的指针,和这个buffer 的长度。这个 demo 中这样设计,那么毫无疑问, WSABuf 中的 buffer 指针必然指向 Buffer。 发送或接收到的数据都需要存在这儿

nTotalbytes,nSentBytes 用来存储要发送或者接收到的数据长度

pIOcontextForward: 这个字段存在的是因为: 我们将一个 Socket 与 _PER_SOCKET_CONTEXT 关联,而一个 _PER_SOCKET_CONTEXT 中仅仅包含一个 _PER_IO_CONTEXT(也就是 Overlapped 结构),那么如何应对在一个socket 上进行多个异步任务的场景呢? 此时就需要多个 _PER_IO_CONTEXT 实例了,此时这个链表就发挥作用了。

这里唯一值得注意的是: Overlapped结构体放在 _PER_IO_CONTEXT 第一个字段,它的好处是,在我们使用 GetQueuedCompletionStatus 获取到当前完成的异步任务的 lpOverlapped 参数时,我们可以直接将该指针强转为 _PER_IO_CONTEXT, 这样我们便能直到当前具体的 I/O 操作是什么。 而 _PER_SOCKET_CONTEXT 这个结构会作为 lpCompletionKey 被GetQueuedCompletionStatus 返回,此时我们便有了当前 Socket 所有的上下文.

这种设计下, 一个 _PER_IO_CONTEXT 结构便 对应一个异步任务,如果一个 socket 有多个异步任务,那么便需要有多个 _PER_IO_CONTEXT 结构. 至于这个 demo 中,对于这个结构体的设计,在实际使用中,有很多需要斟酌的地方。 到了这里,我们使用 WSARecv 和 WSASend 也就不难了。


实例


代码较多,细细品味

  1// THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF
  2// ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO
  3// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A
  4// PARTICULAR PURPOSE.
  5//
  6// Copyright (C) Microsoft Corporation.  All Rights Reserved.
  7//
  8
  9#pragma warning (disable:4127)
 10#pragma comment(lib,"ws2_32.lib")
 11
 12#include <winsock2.h>
 13#include <mswsock.h>
 14#include <Ws2tcpip.h>
 15#include <stdio.h>
 16#include <stdlib.h>
 17#include <strsafe.h>
 18
 19#define DEFAULT_PORT        "5001"
 20#define MAX_BUFF_SIZE       8192
 21#define MAX_WORKER_THREAD   16
 22
 23#define xmalloc(s) HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, (s))
 24#define xfree(p)   HeapFree(GetProcessHeap(), 0, (p))
 25
 26typedef enum _IO_OPERATION {
 27	ClientIoAccept,
 28	ClientIoRead,
 29	ClientIoWrite
 30} IO_OPERATION, *PIO_OPERATION;
 31
 32typedef struct _PER_IO_CONTEXT {
 33	WSAOVERLAPPED               Overlapped;
 34	char                        Buffer[MAX_BUFF_SIZE];
 35	WSABUF                      wsabuf;
 36	int                         nTotalBytes;
 37	int                         nSentBytes;
 38	IO_OPERATION                IOOperation;
 39	SOCKET                      SocketAccept; 
 40
 41	struct _PER_IO_CONTEXT      *pIOContextForward;
 42} PER_IO_CONTEXT, *PPER_IO_CONTEXT;
 43
 44// 作为 lpCompletionKey 使用
 45// 每个 socket 对应一个 _PER_SOCKET_CONTEXT 结构
 46// 该 socket 上的异步任务信息存储在 pIoContext 中,该结构中是一个链表,因此 pIoContext 应当被当作一个动态数组来看待
 47typedef struct _PER_SOCKET_CONTEXT {
 48	SOCKET                      Socket;
 49    LPFN_ACCEPTEX               fnAcceptEx;
 50	PPER_IO_CONTEXT             pIOContext;  
 51	
 52	struct _PER_SOCKET_CONTEXT  *pCtxtBack; 
 53	struct _PER_SOCKET_CONTEXT  *pCtxtForward;
 54} PER_SOCKET_CONTEXT, *PPER_SOCKET_CONTEXT;
 55
 56BOOL CreateListenSocket(void);
 57BOOL CreateAcceptSocket(BOOL fUpdateIOCP);
 58DWORD WINAPI WorkerThread(LPVOID WorkContext);
 59
 60PPER_SOCKET_CONTEXT UpdateCompletionPort(SOCKET s, IO_OPERATION ClientIo, BOOL bAddToList);
 61
 62PPER_SOCKET_CONTEXT CtxtAllocate(SOCKET s, IO_OPERATION ClientIO);
 63VOID CloseClient(PPER_SOCKET_CONTEXT lpPerSocketContext, BOOL bGraceful);
 64VOID CtxtListFree();
 65VOID CtxtListAddTo(PPER_SOCKET_CONTEXT lpPerSocketContext);
 66VOID CtxtListDeleteFrom(PPER_SOCKET_CONTEXT lpPerSocketContext);
 67
 68BOOL                g_bEndServer                        = FALSE;
 69BOOL                g_bRestart                          = TRUE;
 70HANDLE              g_hIOCP                             = INVALID_HANDLE_VALUE;
 71SOCKET              g_sdListen                          = INVALID_SOCKET;
 72HANDLE              g_ThreadHandles[MAX_WORKER_THREAD];
 73WSAEVENT            g_hCleanupEvent[1];
 74PPER_SOCKET_CONTEXT g_pCtxtListenSocket                 = NULL;
 75PPER_SOCKET_CONTEXT g_pCtxtList                         = NULL;
 76CRITICAL_SECTION    g_CriticalSection;
 77
 78int myprintf(const char *lpFormat, ...);
 79
 80void main()	{
 81
 82	SYSTEM_INFO systemInfo;
 83	WSADATA     wsaData;
 84	DWORD       dwThreadCount = 0;
 85	int         nRet = 0;
 86	HANDLE      hThread;
 87	DWORD       dwThreadId;
 88
 89	g_ThreadHandles[0] = (HANDLE)WSA_INVALID_EVENT;
 90
 91	for (int i = 0; i < MAX_WORKER_THREAD; i++) {
 92		g_ThreadHandles[i] = INVALID_HANDLE_VALUE;
 93	}
 94
 95	GetSystemInfo(&systemInfo);
 96	dwThreadCount = systemInfo.dwNumberOfProcessors * 2;
 97
 98	if (WSA_INVALID_EVENT == (g_hCleanupEvent[0] = WSACreateEvent())) {
 99		myprintf("WSACreateEvent() failed: %d\n", WSAGetLastError());
100		return;
101	}
102
103	if ((nRet = WSAStartup(0x202, &wsaData)) != 0) {
104		myprintf("WSAStartup() failed: %d\n",nRet);
105		if(g_hCleanupEvent[0] != WSA_INVALID_EVENT) {
106			WSACloseEvent(g_hCleanupEvent[0]);
107			g_hCleanupEvent[0] = WSA_INVALID_EVENT;
108		}
109		return;
110	}
111
112    InitializeCriticalSection(&g_CriticalSection);
113
114	while (g_bRestart) {
115		g_bRestart = FALSE;
116		g_bEndServer = FALSE;
117		WSAResetEvent(g_hCleanupEvent[0]);
118
119        // 创建 IOCP 实例
120		g_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
121        if (g_hIOCP == NULL) {
122			myprintf("CreateIoCompletionPort() failed to create I/O completion port: %d\n", GetLastError());
123			goto done;
124		}
125        
126        // 启用 worker 线程来处理异步任务完成的通知
127		for (DWORD dwCPU=0; dwCPU<dwThreadCount; dwCPU++) {
128			// Create worker threads to service the overlapped I/O requests.  The decision
129			// to create 2 worker threads per CPU in the system is a heuristic.  Also,
130			// note that thread handles are closed right away, because we will not need them
131			// and the worker threads will continue to execute.
132			hThread = CreateThread(NULL, 0, WorkerThread, g_hIOCP, 0, &dwThreadId);
133			if (hThread == NULL) {
134				myprintf("CreateThread() failed to create worker thread: %d\n", GetLastError());
135				goto done;
136			}
137			g_ThreadHandles[dwCPU] = hThread;
138			hThread = INVALID_HANDLE_VALUE;
139		}
140
141		if (!CreateListenSocket())
142			goto done;
143        
144        // 提交 accept 任务
145		if (!CreateAcceptSocket(TRUE))
146			goto done;
147
148        // 阻塞主线程,直到服务器退出
149		WSAWaitForMultipleEvents(1, g_hCleanupEvent, TRUE, WSA_INFINITE, FALSE);
150
151done:
152        // 当服务器退出时,做一些清理工作
153		g_bEndServer = TRUE;
154
155		// Cause worker threads to exit
156		// 因为我们在子线程中调用 GetQueuedCompletionStatus 使用的timeout 值为 INFINITE, 
157		// 我们需要手动的 post 一个 I/O completion packet 到 IOCP 实例上,以便子线程中的 
158		// GetQueuedCompletionStatus 读取到我们手动 post 的任务完成通知而退出,
159		// 不致于子线程用于无法退出
160		if (g_hIOCP) {
161			for (DWORD i = 0; i < dwThreadCount; i++) {
162				PostQueuedCompletionStatus(g_hIOCP, 0, 0, NULL);
163			}
164		}
165
166		// Make sure worker threads exits.
167		if (WAIT_OBJECT_0 != WaitForMultipleObjects(dwThreadCount,  g_ThreadHandles, TRUE, 1000)) {
168			myprintf("WaitForMultipleObjects() failed: %d\n", GetLastError());
169		} else {
170			for (DWORD i=0; i<dwThreadCount; i++) {
171				if (g_ThreadHandles[i] != INVALID_HANDLE_VALUE)
172					CloseHandle(g_ThreadHandles[i]);
173				g_ThreadHandles[i] = INVALID_HANDLE_VALUE;
174			}
175		}
176
177		if (g_sdListen != INVALID_SOCKET) {
178			closesocket(g_sdListen);
179			g_sdListen = INVALID_SOCKET;
180		}
181
182		if (g_pCtxtListenSocket) {
183		    // 如果当前 Server socket 上还有正在进行的异步任务,等待它完成,再清理
184			while (!HasOverlappedIoCompleted((LPOVERLAPPED)&g_pCtxtListenSocket->pIOContext->Overlapped))
185				Sleep(0);
186
187			if (g_pCtxtListenSocket->pIOContext->SocketAccept != INVALID_SOCKET)
188				closesocket(g_pCtxtListenSocket->pIOContext->SocketAccept);
189			g_pCtxtListenSocket->pIOContext->SocketAccept = INVALID_SOCKET;
190
191			if (g_pCtxtListenSocket->pIOContext)
192				xfree(g_pCtxtListenSocket->pIOContext);
193
194			if (g_pCtxtListenSocket)
195				xfree(g_pCtxtListenSocket);
196			g_pCtxtListenSocket = NULL;
197		}
198
199		CtxtListFree();
200
201		if (g_hIOCP) {
202			CloseHandle(g_hIOCP);
203			g_hIOCP = NULL;
204		}
205	} //while (g_bRestart)
206
207	DeleteCriticalSection(&g_CriticalSection);
208	if (g_hCleanupEvent[0] != WSA_INVALID_EVENT) {
209		WSACloseEvent(g_hCleanupEvent[0]);
210		g_hCleanupEvent[0] = WSA_INVALID_EVENT;
211	}
212	WSACleanup();
213} //main
214
215SOCKET CreateSocket() {
216	int    nRet     = 0;
217	int    nZero    = 0;
218	SOCKET sdSocket = INVALID_SOCKET;
219
220	sdSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED); 
221	if (sdSocket == INVALID_SOCKET) {
222		myprintf("WSASocket(sdSocket) failed: %d\n", WSAGetLastError());
223		return(sdSocket);
224	}
225
226	//
227	// Disable send buffering on the socket.  Setting SO_SNDBUF
228	// to 0 causes winsock to stop buffering sends and perform
229	// sends directly from our buffers, thereby save one memory copy.
230	//
231	// However, this does prevent the socket from ever filling the
232	// send pipeline. This can lead to packets being sent that are
233	// not full (i.e. the overhead of the IP and TCP headers is 
234	// great compared to the amount of data being carried).
235	//
236	// Disabling the send buffer has less serious repercussions 
237	// than disabling the receive buffer.
238	//
239	nZero = 0;
240	nRet = setsockopt(sdSocket, SOL_SOCKET, SO_SNDBUF, (char *)&nZero, sizeof(nZero));
241	if (nRet == SOCKET_ERROR) {
242		myprintf("setsockopt(SNDBUF) failed: %d\n", WSAGetLastError());
243		return(sdSocket);
244	}
245
246	//
247	// Don't disable receive buffering. This will cause poor network
248	// performance since if no receive is posted and no receive buffers,
249	// the TCP stack will set the window size to zero and the peer will
250	// no longer be allowed to send data.
251	//
252
253	// 
254	// Do not set a linger value...especially don't set it to an abortive
255	// close. If you set abortive close and there happens to be a bit of
256	// data remaining to be transfered (or data that has not been 
257	// acknowledged by the peer), the connection will be forcefully reset
258	// and will lead to a loss of data (i.e. the peer won't get the last
259	// bit of data). This is BAD. If you are worried about malicious
260	// clients connecting and then not sending or receiving, the server
261	// should maintain a timer on each connection. If after some point,
262	// the server deems a connection is "stale" it can then set linger
263	// to be abortive and close the connection.
264	//
265
266	/*
267	LINGER lingerStruct;
268	lingerStruct.l_onoff = 1;
269	lingerStruct.l_linger = 0;
270	nRet = setsockopt(sdSocket, SOL_SOCKET, SO_LINGER,
271					  (char *)&lingerStruct, sizeof(lingerStruct));
272	if( nRet == SOCKET_ERROR ) {
273		myprintf("setsockopt(SO_LINGER) failed: %d\n", WSAGetLastError());
274		return(sdSocket);
275	}
276	*/
277
278	return(sdSocket);
279}
280
281BOOL CreateListenSocket(void) {
282	
283	int    nRet                = 0;
284	LINGER lingerStruct;
285	struct addrinfo hints      = {0};
286	struct addrinfo *addrlocal = NULL;
287
288	lingerStruct.l_onoff  = 1;
289	lingerStruct.l_linger = 0;
290
291	hints.ai_flags    = AI_PASSIVE;
292	hints.ai_family   = AF_INET;
293	hints.ai_socktype = SOCK_STREAM;
294	hints.ai_protocol = IPPROTO_IP;
295
296	if (getaddrinfo(NULL, DEFAULT_PORT, &hints, &addrlocal) != 0) {
297		myprintf("getaddrinfo() failed with error %d\n", WSAGetLastError());
298		return FALSE;
299	}
300
301	if (addrlocal == NULL) {
302		myprintf("getaddrinfo() failed to resolve/convert the interface\n");
303		return FALSE;
304	}
305
306	g_sdListen = CreateSocket();
307	if (g_sdListen == INVALID_SOCKET) {
308		freeaddrinfo(addrlocal);
309		return FALSE;
310	}
311
312	nRet = bind(g_sdListen, addrlocal->ai_addr, (int) addrlocal->ai_addrlen);
313	if (nRet == SOCKET_ERROR) {
314		myprintf("bind() failed: %d\n", WSAGetLastError());
315		freeaddrinfo(addrlocal);
316		return FALSE;
317	}
318
319	nRet = listen(g_sdListen, 5);
320	if (nRet == SOCKET_ERROR) {
321		myprintf("listen() failed: %d\n", WSAGetLastError());
322		freeaddrinfo(addrlocal);
323		return FALSE;
324	}
325
326	freeaddrinfo(addrlocal);
327
328	return TRUE;
329}
330
331//
332// Create a socket and invoke AcceptEx.  Only the original call to to this
333// function needs to be added to the IOCP.
334//
335// If the expected behaviour of connecting client applications is to NOT
336// send data right away, then only posting one AcceptEx can cause connection
337// attempts to be refused if a client connects without sending some initial
338// data (notice that the associated iocpclient does not operate this way 
339// but instead makes a connection and starts sending data write away).  
340// This is because the IOCP packet does not get delivered without the initial
341// data (as implemented in this sample) thus preventing the worker thread 
342// from posting another AcceptEx and eventually the backlog value set in 
343// listen() will be exceeded if clients continue to try to connect.
344//
345// One technique to address this situation is to simply cause AcceptEx
346// to return right away upon accepting a connection without returning any
347// data.  This can be done by setting dwReceiveDataLength=0 when calling AcceptEx.
348//
349// Another technique to address this situation is to post multiple calls 
350// to AcceptEx.  Posting multiple calls to AcceptEx is similar in concept to 
351// increasing the backlog value in listen(), though posting AcceptEx is 
352// dynamic (i.e. during the course of running your application you can adjust 
353// the number of AcceptEx calls you post).  It is important however to keep
354// your backlog value in listen() high in your server to ensure that the 
355// stack can accept connections even if your application does not get enough 
356// CPU cycles to repost another AcceptEx under stress conditions.
357// 
358// This sample implements neither of these techniques and is therefore
359// susceptible to the behaviour described above.
360//
361BOOL CreateAcceptSocket(BOOL fUpdateIOCP) {
362
363	int   nRet           = 0;
364	DWORD dwRecvNumBytes = 0;
365	DWORD bytes          = 0;
366
367	GUID acceptex_guid = WSAID_ACCEPTEX;
368
369	//The context for listening socket uses the SockAccept member to store the
370	//socket for client connection. 
371	if (fUpdateIOCP) {
372		g_pCtxtListenSocket = UpdateCompletionPort(g_sdListen, ClientIoAccept, FALSE);
373		if (g_pCtxtListenSocket == NULL) {
374			myprintf("failed to update listen socket to IOCP\n");
375			return FALSE;
376		}
377
378        // 动态获取 AcceptEx 方法的函数指针
379		// 将它保存再对应 Socket context 上
380		nRet = WSAIoctl(
381			g_sdListen,
382			SIO_GET_EXTENSION_FUNCTION_POINTER,
383			&acceptex_guid,
384			sizeof(acceptex_guid),
385			&g_pCtxtListenSocket->fnAcceptEx,
386			sizeof(g_pCtxtListenSocket->fnAcceptEx),
387			&bytes,
388			NULL,
389			NULL
390		);
391		if (nRet == SOCKET_ERROR) {
392			myprintf("failed to load AcceptEx: %d\n", WSAGetLastError());
393			return FALSE;
394		}
395	}
396
397	g_pCtxtListenSocket->pIOContext->SocketAccept = CreateSocket();
398	if (g_pCtxtListenSocket->pIOContext->SocketAccept == INVALID_SOCKET) {
399		myprintf("failed to create new accept socket\n");
400		return FALSE;
401	}
402
403	// 提交接收任务
404	// 这里,我们期待接收 socket 的同时从该 socket 上 接收一块儿数据
405	nRet = g_pCtxtListenSocket->fnAcceptEx(
406		g_sdListen, 
407		g_pCtxtListenSocket->pIOContext->SocketAccept,
408		(LPVOID)(g_pCtxtListenSocket->pIOContext->Buffer),
409		MAX_BUFF_SIZE - (2 * (sizeof(SOCKADDR_STORAGE) + 16)),
410		sizeof(SOCKADDR_STORAGE) + 16, 
411		sizeof(SOCKADDR_STORAGE) + 16,
412		&dwRecvNumBytes,
413		(LPOVERLAPPED) &(g_pCtxtListenSocket->pIOContext->Overlapped)
414	);
415	if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {
416		myprintf("AcceptEx() failed: %d\n", WSAGetLastError());
417		return FALSE;
418	}
419
420	return TRUE;
421}
422
423DWORD WINAPI WorkerThread (LPVOID WorkThreadContext)	{
424
425	HANDLE              hIOCP                 = (HANDLE)WorkThreadContext;
426	BOOL                bSuccess              = FALSE;
427	int                 nRet                  = 0;
428	LPWSAOVERLAPPED     lpOverlapped          = NULL;
429	PPER_SOCKET_CONTEXT lpPerSocketContext    = NULL;
430	PPER_SOCKET_CONTEXT lpAcceptSocketContext = NULL;
431	PPER_IO_CONTEXT     lpIOContext           = NULL; 
432	WSABUF              buffRecv;
433	WSABUF              buffSend;
434	DWORD               dwRecvNumBytes        = 0;
435	DWORD               dwSendNumBytes        = 0;
436	DWORD               dwFlags               = 0;
437	DWORD               dwIoSize              = 0;
438	HRESULT             hRet;
439
440	while (TRUE) {
441        // 阻塞的等待有异步任务完成的通知到来
442        // 如果没有,一直等待
443		bSuccess = GetQueuedCompletionStatus(
444			hIOCP,
445			&dwIoSize,
446			(PDWORD_PTR)&lpPerSocketContext,
447			(LPOVERLAPPED *)&lpOverlapped,
448			INFINITE 
449		);
450
451		if (!bSuccess)
452			myprintf("GetQueuedCompletionStatus() failed: %d\n", GetLastError());
453
454        // 当服务器退出时,我们使用 PostQueuedCompletionStatus post 的消息会触发这个 case
455        // 我们当前子线程便可以正常退出了
456		if (lpPerSocketContext == NULL) {
457			return 0;
458		}
459
460		if (g_bEndServer) {
461			return 0;
462		}
463
464		lpIOContext = (PPER_IO_CONTEXT)lpOverlapped;
465
466		//
467		//We should never skip the loop and not post another AcceptEx if the current
468		//completion packet is for previous AcceptEx
469		//
470		if (lpIOContext->IOOperation != ClientIoAccept) {
471			if (!bSuccess || (bSuccess && (0 == dwIoSize))) {
472				CloseClient(lpPerSocketContext, FALSE); 
473				continue;
474			}
475		}
476
477		//
478		// determine what type of IO packet has completed by checking the PER_IO_CONTEXT 
479		// associated with this socket.  This will determine what action to take.
480		//
481		switch (lpIOContext->IOOperation) {
482
483		case ClientIoAccept:
484
485			//
486			// When the AcceptEx function returns, the socket sAcceptSocket is 
487			// in the default state for a connected socket. The socket sAcceptSocket 
488			// does not inherit the properties of the socket associated with 
489			// sListenSocket parameter until SO_UPDATE_ACCEPT_CONTEXT is set on 
490			// the socket. Use the setsockopt function to set the SO_UPDATE_ACCEPT_CONTEXT 
491			// option, specifying sAcceptSocket as the socket handle and sListenSocket 
492			// as the option value. 
493			//
494			nRet = setsockopt(
495				lpPerSocketContext->pIOContext->SocketAccept, 
496				SOL_SOCKET,
497				SO_UPDATE_ACCEPT_CONTEXT,
498				(char *)&g_sdListen,
499				sizeof(g_sdListen)
500			);
501
502			if (nRet == SOCKET_ERROR) {
503				//
504				//just warn user here.
505				//
506				myprintf("setsockopt(SO_UPDATE_ACCEPT_CONTEXT) failed to update accept socket\n");
507				WSASetEvent(g_hCleanupEvent[0]);
508				return 0;
509			}
510
511			lpAcceptSocketContext = UpdateCompletionPort(lpPerSocketContext->pIOContext->SocketAccept, ClientIoAccept, TRUE);
512
513			if (lpAcceptSocketContext == NULL) {
514				//
515				//just warn user here.
516				//
517				myprintf("failed to update accept socket to IOCP\n");
518				WSASetEvent(g_hCleanupEvent[0]);
519				return 0;
520			}
521
522			if (dwIoSize) {
523				lpAcceptSocketContext->pIOContext->IOOperation  = ClientIoWrite;
524				lpAcceptSocketContext->pIOContext->nTotalBytes  = dwIoSize;
525				lpAcceptSocketContext->pIOContext->nSentBytes   = 0;
526				lpAcceptSocketContext->pIOContext->wsabuf.len   = dwIoSize;
527				hRet = StringCbCopyNA(
528					lpAcceptSocketContext->pIOContext->Buffer,
529					MAX_BUFF_SIZE,
530					lpPerSocketContext->pIOContext->Buffer,
531					sizeof(lpPerSocketContext->pIOContext->Buffer)
532				);
533				lpAcceptSocketContext->pIOContext->wsabuf.buf = lpAcceptSocketContext->pIOContext->Buffer;
534
535				nRet = WSASend(
536					lpPerSocketContext->pIOContext->SocketAccept,
537					&lpAcceptSocketContext->pIOContext->wsabuf, 
538					1,
539					&dwSendNumBytes,
540					0,
541					&(lpAcceptSocketContext->pIOContext->Overlapped), 
542					NULL
543				);
544
545				if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {
546					myprintf ("WSASend() failed: %d\n", WSAGetLastError());
547					CloseClient(lpAcceptSocketContext, FALSE);
548				} else {
549					myprintf("WorkerThread %d: Socket(%d) AcceptEx completed (%d bytes), Send posted\n", GetCurrentThreadId(), lpPerSocketContext->Socket, dwIoSize);
550				}
551			} else {
552				//
553				// AcceptEx completes but doesn't read any data so we need to post
554				// an outstanding overlapped read.
555				//
556				lpAcceptSocketContext->pIOContext->IOOperation = ClientIoRead;
557				dwRecvNumBytes = 0;
558				dwFlags = 0;
559				buffRecv.buf = lpAcceptSocketContext->pIOContext->Buffer,
560				buffRecv.len = MAX_BUFF_SIZE;
561				nRet = WSARecv(
562					lpAcceptSocketContext->Socket,
563					&buffRecv, 
564					1,
565					&dwRecvNumBytes,
566					&dwFlags,
567					&lpAcceptSocketContext->pIOContext->Overlapped, 
568					NULL
569				);
570				if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {
571					myprintf ("WSARecv() failed: %d\n", WSAGetLastError());
572					CloseClient(lpAcceptSocketContext, FALSE);
573				}
574			}
575
576			//
577			//Time to post another outstanding AcceptEx
578			//
579			if (!CreateAcceptSocket(FALSE)) {
580				myprintf("Please shut down and reboot the server.\n");
581				WSASetEvent(g_hCleanupEvent[0]);
582				return(0);
583			}
584			break;
585
586		case ClientIoRead:
587
588			//
589			// a read operation has completed, post a write operation to echo the
590			// data back to the client using the same data buffer.
591			//
592			lpIOContext->IOOperation = ClientIoWrite;
593			lpIOContext->nTotalBytes = dwIoSize;
594			lpIOContext->nSentBytes  = 0;
595			lpIOContext->wsabuf.len  = dwIoSize;
596			dwFlags = 0;
597			nRet = WSASend(
598				lpPerSocketContext->Socket,
599				&lpIOContext->wsabuf, 
600				1, 
601				&dwSendNumBytes,
602				dwFlags,
603				&(lpIOContext->Overlapped), 
604				NULL
605			);
606			if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {
607				myprintf("WSASend() failed: %d\n", WSAGetLastError());
608				CloseClient(lpPerSocketContext, FALSE);
609			} else {
610				myprintf("WorkerThread %d: Socket(%d) Recv completed (%d bytes), Send posted\n", GetCurrentThreadId(), lpPerSocketContext->Socket, dwIoSize);
611			}
612			break;
613
614		case ClientIoWrite:
615
616			//
617			// a write operation has completed, determine if all the data intended to be
618			// sent actually was sent.
619			//
620			lpIOContext->IOOperation = ClientIoWrite;
621			lpIOContext->nSentBytes  += dwIoSize;
622			dwFlags = 0;
623			if (lpIOContext->nSentBytes < lpIOContext->nTotalBytes) {
624				//
625				// the previous write operation didn't send all the data,
626				// post another send to complete the operation
627				//
628				buffSend.buf = lpIOContext->Buffer + lpIOContext->nSentBytes;
629				buffSend.len = lpIOContext->nTotalBytes - lpIOContext->nSentBytes;
630				nRet = WSASend (
631					lpPerSocketContext->Socket,
632					&buffSend,
633					1, 
634					&dwSendNumBytes,
635					dwFlags,
636					&(lpIOContext->Overlapped), 
637					NULL
638				);
639				if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {
640					myprintf ("WSASend() failed: %d\n", WSAGetLastError());
641					CloseClient(lpPerSocketContext, FALSE);
642				} else {
643					myprintf("WorkerThread %d: Socket(%d) Send partially completed (%d bytes), Recv posted\n", GetCurrentThreadId(), lpPerSocketContext->Socket, dwIoSize);
644				}
645			} else {
646
647				//
648				// previous write operation completed for this socket, post another recv
649				//
650				lpIOContext->IOOperation = ClientIoRead; 
651				dwRecvNumBytes = 0;
652				dwFlags = 0;
653				buffRecv.buf = lpIOContext->Buffer,
654				buffRecv.len = MAX_BUFF_SIZE;
655				nRet = WSARecv(
656					lpPerSocketContext->Socket,
657					&buffRecv, 
658					1, 
659					&dwRecvNumBytes,
660					&dwFlags,
661					&lpIOContext->Overlapped, 
662					NULL
663				);
664				if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {
665					myprintf ("WSARecv() failed: %d\n", WSAGetLastError());
666					CloseClient(lpPerSocketContext, FALSE);
667				} else {
668					myprintf("WorkerThread %d: Socket(%d) Send completed (%d bytes), Recv posted\n", GetCurrentThreadId(), lpPerSocketContext->Socket, dwIoSize);
669				}
670			}
671			break;
672
673		} //switch
674	} //while
675	return 0;
676} 
677
678//
679//  Allocate a context structures for the socket and add the socket to the IOCP.  
680//  Additionally, add the context structure to the global list of context structures.
681//
682PPER_SOCKET_CONTEXT UpdateCompletionPort(SOCKET sd, IO_OPERATION ClientIo, BOOL bAddToList)	{
683
684	PPER_SOCKET_CONTEXT lpPerSocketContext;
685
686	lpPerSocketContext = CtxtAllocate(sd, ClientIo);
687	if (lpPerSocketContext == NULL)
688		return NULL;
689
690	g_hIOCP = CreateIoCompletionPort((HANDLE)sd, g_hIOCP, (DWORD_PTR)lpPerSocketContext, 0);
691	if (g_hIOCP == NULL) {
692		myprintf("CreateIoCompletionPort() failed: %d\n", GetLastError());
693		if( lpPerSocketContext->pIOContext )
694			xfree(lpPerSocketContext->pIOContext);
695		xfree(lpPerSocketContext);
696		return NULL;
697	}
698
699	//
700	//The listening socket context (bAddToList is FALSE) is not added to the list.
701	//All other socket contexts are added to the list.
702	//
703	if (bAddToList) CtxtListAddTo(lpPerSocketContext);
704
705	myprintf("UpdateCompletionPort: Socket(%d) added to IOCP\n", lpPerSocketContext->Socket);
706
707	return lpPerSocketContext;
708}
709
710//
711//  Close down a connection with a client.  This involves closing the socket (when 
712//  initiated as a result of a CTRL-C the socket closure is not graceful).  Additionally, 
713//  any context data associated with that socket is free'd.
714//
715VOID CloseClient (PPER_SOCKET_CONTEXT lpPerSocketContext, BOOL bGraceful)	{
716
717	EnterCriticalSection(&g_CriticalSection);
718
719	if (lpPerSocketContext) {
720		myprintf("CloseClient: Socket(%d) connection closing (graceful=%s)\n", lpPerSocketContext->Socket, (bGraceful?"TRUE":"FALSE"));
721		if (!bGraceful) {
722			//
723			// force the subsequent closesocket to be abortative.
724			//
725			LINGER  lingerStruct;
726
727			lingerStruct.l_onoff = 1;
728			lingerStruct.l_linger = 0;
729			setsockopt(lpPerSocketContext->Socket, SOL_SOCKET, SO_LINGER, (char *)&lingerStruct, sizeof(lingerStruct));
730		}
731		if (lpPerSocketContext->pIOContext->SocketAccept != INVALID_SOCKET) {
732			closesocket(lpPerSocketContext->pIOContext->SocketAccept);
733			lpPerSocketContext->pIOContext->SocketAccept = INVALID_SOCKET;
734		};
735
736		closesocket(lpPerSocketContext->Socket);
737		lpPerSocketContext->Socket = INVALID_SOCKET;
738		CtxtListDeleteFrom(lpPerSocketContext);
739		lpPerSocketContext = NULL;
740	} else {
741		myprintf("CloseClient: lpPerSocketContext is NULL\n");
742	}
743
744	LeaveCriticalSection(&g_CriticalSection);
745	return;
746} 
747
748//
749// Allocate a socket context for the new connection.  
750//
751PPER_SOCKET_CONTEXT CtxtAllocate(SOCKET sd, IO_OPERATION ClientIO)	{
752
753	PPER_SOCKET_CONTEXT lpPerSocketContext;
754
755	EnterCriticalSection(&g_CriticalSection);
756
757	lpPerSocketContext = (PPER_SOCKET_CONTEXT)xmalloc(sizeof(PER_SOCKET_CONTEXT));
758	if (lpPerSocketContext) {
759		lpPerSocketContext->pIOContext = (PPER_IO_CONTEXT)xmalloc(sizeof(PER_IO_CONTEXT));
760		if( lpPerSocketContext->pIOContext ) {
761			lpPerSocketContext->Socket = sd;
762			lpPerSocketContext->pCtxtBack = NULL;
763			lpPerSocketContext->pCtxtForward = NULL;
764
765			lpPerSocketContext->pIOContext->Overlapped.Internal = 0;
766			lpPerSocketContext->pIOContext->Overlapped.InternalHigh = 0;
767			lpPerSocketContext->pIOContext->Overlapped.Offset = 0;
768			lpPerSocketContext->pIOContext->Overlapped.OffsetHigh = 0;
769			lpPerSocketContext->pIOContext->Overlapped.hEvent = NULL;
770			lpPerSocketContext->pIOContext->IOOperation = ClientIO;
771			lpPerSocketContext->pIOContext->pIOContextForward = NULL;
772			lpPerSocketContext->pIOContext->nTotalBytes = 0;
773			lpPerSocketContext->pIOContext->nSentBytes  = 0;
774			lpPerSocketContext->pIOContext->wsabuf.buf  = lpPerSocketContext->pIOContext->Buffer;
775			lpPerSocketContext->pIOContext->wsabuf.len  = sizeof(lpPerSocketContext->pIOContext->Buffer);
776			lpPerSocketContext->pIOContext->SocketAccept = INVALID_SOCKET;
777
778			ZeroMemory(lpPerSocketContext->pIOContext->wsabuf.buf, lpPerSocketContext->pIOContext->wsabuf.len);
779		} else {
780			xfree(lpPerSocketContext);
781			myprintf("HeapAlloc() PER_IO_CONTEXT failed: %d\n", GetLastError());
782		}
783
784	} else {
785		myprintf("HeapAlloc() PER_SOCKET_CONTEXT failed: %d\n", GetLastError());
786		return NULL;
787	}
788
789	LeaveCriticalSection(&g_CriticalSection);
790
791	return(lpPerSocketContext);
792}
793
794//
795//  Add a client connection context structure to the global list of context structures.
796//
797VOID CtxtListAddTo(PPER_SOCKET_CONTEXT lpPerSocketContext)	{
798
799	PPER_SOCKET_CONTEXT pTemp;
800
801	EnterCriticalSection(&g_CriticalSection);
802
803	if (g_pCtxtList == NULL) {
804		//
805		// add the first node to the linked list
806		//
807		lpPerSocketContext->pCtxtBack    = NULL;
808		lpPerSocketContext->pCtxtForward = NULL;
809		g_pCtxtList = lpPerSocketContext;
810	} else {
811		//
812		// add node to head of list
813		//
814		pTemp = g_pCtxtList;
815
816		g_pCtxtList = lpPerSocketContext;
817		lpPerSocketContext->pCtxtBack    = pTemp;
818		lpPerSocketContext->pCtxtForward = NULL;    
819
820		pTemp->pCtxtForward = lpPerSocketContext;
821	}
822
823	LeaveCriticalSection(&g_CriticalSection);
824	return;
825}
826
827//
828//  Remove a client context structure from the global list of context structures.
829//
830VOID CtxtListDeleteFrom(PPER_SOCKET_CONTEXT lpPerSocketContext)	{
831
832	PPER_SOCKET_CONTEXT pBack;
833	PPER_SOCKET_CONTEXT pForward;
834	PPER_IO_CONTEXT     pNextIO     = NULL;
835	PPER_IO_CONTEXT     pTempIO     = NULL;
836
837	EnterCriticalSection(&g_CriticalSection);
838
839	if (lpPerSocketContext) {
840		pBack       = lpPerSocketContext->pCtxtBack;
841		pForward    = lpPerSocketContext->pCtxtForward;
842
843		if (pBack == NULL && pForward == NULL) {
844
845			//
846			// This is the only node in the list to delete
847			//
848			g_pCtxtList = NULL;
849		} else if (pBack == NULL && pForward != NULL) {
850
851			//
852			// This is the start node in the list to delete
853			//
854			pForward->pCtxtBack = NULL;
855			g_pCtxtList = pForward;
856		} else if (pBack != NULL && pForward == NULL) {
857
858			//
859			// This is the end node in the list to delete
860			//
861			pBack->pCtxtForward = NULL;
862		} else if (pBack && pForward) {
863
864			//
865			// Neither start node nor end node in the list
866			//
867			pBack->pCtxtForward = pForward;
868			pForward->pCtxtBack = pBack;
869		}
870
871		//
872		// Free all i/o context structures per socket
873		//
874		pTempIO = (PPER_IO_CONTEXT)(lpPerSocketContext->pIOContext);
875		do {
876			pNextIO = (PPER_IO_CONTEXT)(pTempIO->pIOContextForward);
877			if (pTempIO) {
878
879				//
880				//The overlapped structure is safe to free when only the posted i/o has
881				//completed. Here we only need to test those posted but not yet received 
882				//by PQCS in the shutdown process.
883				//
884				if (g_bEndServer)
885					while (!HasOverlappedIoCompleted((LPOVERLAPPED)pTempIO)) Sleep(0);
886				xfree(pTempIO);
887				pTempIO = NULL;
888			}
889			pTempIO = pNextIO;
890		} while (pNextIO);
891
892		xfree(lpPerSocketContext);
893		lpPerSocketContext = NULL;
894	} else {
895		myprintf("CtxtListDeleteFrom: lpPerSocketContext is NULL\n");
896	}
897
898	LeaveCriticalSection(&g_CriticalSection);
899	return;
900}
901
902//
903//  Free all context structure in the global list of context structures.
904//
905VOID CtxtListFree() {
906	PPER_SOCKET_CONTEXT pTemp1, pTemp2;
907
908	EnterCriticalSection(&g_CriticalSection);
909
910	pTemp1 = g_pCtxtList; 
911	while (pTemp1) {
912		pTemp2 = pTemp1->pCtxtBack;
913		CloseClient(pTemp1, FALSE);
914		pTemp1 = pTemp2;
915	}
916
917	LeaveCriticalSection(&g_CriticalSection);
918	return;
919}
920
921int myprintf(const char *lpFormat, ...) {
922
923	int nLen = 0;
924	int nRet = 0;
925	char cBuffer[512] ;
926	va_list arglist ;
927	HANDLE hOut = NULL;
928	HRESULT hRet;
929
930	ZeroMemory(cBuffer, sizeof(cBuffer));
931
932	va_start(arglist, lpFormat);
933
934	nLen = lstrlenA(lpFormat) ;
935	hRet = StringCchVPrintfA(cBuffer,512,lpFormat,arglist);
936	
937	if (nRet >= nLen || GetLastError() == 0) {
938		hOut = GetStdHandle(STD_OUTPUT_HANDLE);
939		if (hOut != INVALID_HANDLE_VALUE)
940			WriteConsole( hOut, cBuffer, lstrlenA(cBuffer), (LPDWORD)&nLen, NULL ) ;
941	}
942
943	return nLen ;
944}
945

END!!!


Tags