WinSock I/O 模型 -- OVERLAPPED I/O 模型

Oct 16, 2021

目录


简介


OVERLAPPED I/O 模型也是 WinSock 中常见的异步 I/O 模型,相比于我们之前提到的 Select 模型WSAAsyncSelect 模型WSAEventSelect 模型有更好的性能.

为了方便描述,下文我们将称 Overlapped I/O 模型为 “重叠模型”.

重叠模型的基本设计原理便是让应用程序使用一个 重叠的数据结构(Overlapped),一次投递一个或多个 Winsock I/O 请求。针对那些提交的请求,在它们完成 之后,应用程序可为它们提供服务

使用这个模型,网络应用程序通过接收以 Windows 消息为基础的网络事件通知来处理网络请求。

这篇文章我们就来看看如何使用 重叠 I/O 相关的 api 来实现一个简单的 TCP 服务器.

这里我们介绍基于 Event 的实现.


API 基础


这里我们不再介绍 WSAEvent 类型相关的API,之前的文章中已经涉及过.

Overlapped 结构体

对于该结构体,官方的描述为: 一个包含异步输入输出任务信息的结构体

 1typedef struct _OVERLAPPED {
 2  ULONG_PTR Internal;
 3  ULONG_PTR InternalHigh;
 4  union {
 5    struct {
 6      DWORD Offset;
 7      DWORD OffsetHigh;
 8    } DUMMYSTRUCTNAME;
 9    PVOID Pointer;
10  } DUMMYUNIONNAME;
11  HANDLE    hEvent;
12} OVERLAPPED, *LPOVERLAPPED;

对于该结构体中的字段,我们这里不详细描述,因为大部分虽然当前官方文档中有详细描述,但是同时也声明了未来可能会改变,因此我们的应用程序不应该依赖于这些字段的任何特定值. 而是应该通过对应的 API 方法来获取自己感兴趣的信息.

使用是应该总是将所有字段置为 0 或这 NULL, 除了 hEvent 字段.

唯一非常重要的字段是: hEvent:一个 WSAEvent 事件的 handle. 当与当前 Overlapped 结构体关联的异步任务完成时,该 hEvent 会被触发.

WSAGetOverlappedResult

WSAGetOverlappedResult 用于获取某 SOCKET 异步任务的结果.

1BOOL WSAAPI WSAGetOverlappedResult(
2  SOCKET          s,
3  LPWSAOVERLAPPED lpOverlapped,
4  LPDWORD         lpcbTransfer,
5  BOOL            fWait,
6  LPDWORD         lpdwFlags
7);
  1. s: SOCKET s 为当通过特定 API(AcceptEx, ConnectEx, DisconnectEx, TransmitFile, TransmitPackets, WSARecv, WSARecvFrom, LPFN_WSARECVMSG (WSARecvMsg), WSASend, WSASendMsg, WSASendTo, 和 WSAIoctl) 添加这个异步任务时,这个异步任务所关联的 SOCKET。
  2. lpOverlapped: 一个 OVERLAPPED 结构体的指针,为添加该异步任务时所使用的 Overlapped 结构体. 该参数不能为 NULL.
  3. lpcbTransfer: 返回当前异步任务上已经传输的字节数(发送或者接收)。该参数不能为 NULL
  4. fWait:指定当前方法调用是否等待当前异步任务结束. 当指定为 TRUE时,该方法会一直阻塞直到当前异步任务完成. 当指定为 FALSE 时,如果当前异步任务还未完成,这个方法会返回 FALSE, 此时调用 WSAGetLastError 将会返回 WSA_IO_INCOMPLETE。
  5. lpdwFlags:略

AcceptEx

该 API 也可以在 重叠 I/O 模式下使用,并且该方法的性能高于传统的 accept 方法,这里我们为了简单,先不使用 AcceptEx 方法,在 IOCP 模式我们再介绍该方法.

WSARecv

WSARecv 用于从一个已经连接的 SOCKET 接收数据.

1int WSAAPI WSARecv(
2  SOCKET                             s,
3  LPWSABUF                           lpBuffers,
4  DWORD                              dwBufferCount,
5  LPDWORD                            lpNumberOfBytesRecvd,
6  LPDWORD                            lpFlags,
7  LPWSAOVERLAPPED                    lpOverlapped,
8  LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
9);
  1. s: SOCKET handle
  2. lpBufffers: 一个 WSABuf 结构体的数组. 该结构体比较简单,我们在实例小节描述其用法.
  3. dwBufferCount: lpBuffers 数组中元素的数量
  4. lpNumberOfBytesRecvd: 当此次方法调用,函数返回时已经成功的在 SOCKET 上读取到了数据,这个参数保存读取到的字节数. 当 lpOverlapped 参数不为空时,该参数可以为空.
  5. lpOverlapped: 与当前异步接收任务关联的 Overlapped 结构体.
  6. lpCompletionRoutine: 本文中我们使用基于事件的重叠I/O模型,因此我们不使用这个字段.
  7. 返回值: 如果当前读操作立马成功,返回值为 0. 否则,返回 SOCKET_ERROR. 具体的错误码通过 WSAGetLastError 获取。 如果具体的错误码为 WSA_IO_PENDING 表明当前异步任务已经成功提交,在该任务完成后 lpCompletionRoutine 会被调用或者 Overlapped 结构体中的 hEvent 事件会被触发。本文,我们将依赖于 hEvent 参数来处理异步完成的任务. 对于其他的错误码,请参考该 API 的官方文档.

WSASend 与 WSARecv 类似,我们不再赘述.


实现思路


  1. 创建一个 socket 作为监听 socket
  2. 创建子线程用于等待并处理异步 I/O 任务的结果。
  3. 在主线程中循环等待新连接的到来。注意,这里我们为了简单使用阻塞的 Accept 方法。 使用 AcceptEx 方法可以异步的来接收新的连接。 但是我们使用较简单的 Accept 方法.
  4. 在主线程中,当新连接到来,接收它,并为他创建对应的 OVERLAPPED 结构体和 WSAEvent 对象。将 WSAEvent 对象设置到 OVERLAPPED 对象的 hEvent 字段. 然后使用 WSARecv api 来从该客户端链接上接收数据. 注意该读不会阻塞主线程,它是异步的.
  5. 在子线程中,使用 WSAWaitForMultipleEvents 来等待我们所创建中的所有 Event 中任何一个被触发的事件. 否则阻塞子线程.
  6. 当有新的 event 被触发时,使用 WSAGetOverlappedResult 来获取当前任务的完成结果, 并处理它(一般都会再次提交新的异步 I/O 任务).

实例


  1#include <winsock2.h>
  2#include <windows.h>
  3#include <stdio.h>
  4
  5#define _WINSOCK_DEPRECATED_NO_WARNINGS
  6#pragma comment(lib,"ws2_32.lib")
  7
  8#define PORT         8080
  9#define DATA_BUFSIZE 8192
 10
 11typedef struct _SOCKET_CONTEXT {
 12   CHAR          Buffer[DATA_BUFSIZE];
 13   WSABUF        DataBuf;
 14   SOCKET        Socket;
 15   WSAOVERLAPPED Overlapped;
 16   DWORD         BytesSEND;
 17   DWORD         BytesRECV;
 18} SOCKET_CONTEXT, * LPSOCKET_CONTEXT;
 19
 20DWORD WINAPI ProcessIO(LPVOID lpParameter);
 21
 22DWORD            EventTotal = 0;
 23WSAEVENT         EventArray[WSA_MAXIMUM_WAIT_EVENTS];
 24LPSOCKET_CONTEXT SocketArray[WSA_MAXIMUM_WAIT_EVENTS];
 25CRITICAL_SECTION CriticalSection;
 26
 27int main() {
 28
 29  WSADATA           wsaData;
 30  SOCKET            ListenSocket, AcceptSocket;
 31  SOCKADDR_IN       Addr;
 32  DWORD             Flags;
 33  DWORD             ThreadId;
 34  DWORD             RecvBytes;
 35
 36  // 我们是多线程程序,锁是必不可少的
 37  InitializeCriticalSection(&CriticalSection);
 38
 39  if (WSAStartup(0x0202, &wsaData) != 0) {
 40    printf("WSAStartup() failed with error %d\n", WSAGetLastError());
 41    return 1;
 42  }
 43
 44  if ((ListenSocket = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) {
 45    printf("socket() failed with error %d\n", WSAGetLastError());
 46    return 1;
 47  }
 48
 49  Addr.sin_family      = AF_INET;
 50  Addr.sin_addr.s_addr = htonl(INADDR_ANY);
 51  Addr.sin_port        = htons(PORT);
 52 
 53  if (bind(ListenSocket, (PSOCKADDR) &Addr, sizeof(Addr)) == SOCKET_ERROR) {
 54    printf("bind() failed with error %d\n", WSAGetLastError());
 55    return 1;
 56  }
 57
 58  if (listen(ListenSocket, 10)) {
 59    printf("listen() failed with error %d\n", WSAGetLastError());
 60    return 1;
 61  }
 62
 63  if ((AcceptSocket = WSASocketW(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
 64    printf("Failed to get a socket %d\n", WSAGetLastError());
 65    return 1;
 66  }
 67
 68  if ((EventArray[0] = WSACreateEvent()) == WSA_INVALID_EVENT) {
 69    printf("WSACreateEvent() failed with error %d\n", WSAGetLastError());
 70    return 1;
 71  }
 72
 73  // 创建子线程,用来处理异步任务的结果
 74  if (CreateThread(NULL, 0, ProcessIO, NULL, 0, &ThreadId) == NULL) {
 75    printf("CreateThread() failed with error %d\n", GetLastError());
 76    return 1;
 77  }
 78
 79  EventTotal = 1;
 80  while(TRUE) {
 81
 82    // 阻塞的接收新的客户端连接
 83    if ((AcceptSocket = accept(ListenSocket, NULL, NULL)) == INVALID_SOCKET) {
 84      printf("accept() failed with error %d\n", WSAGetLastError());
 85      return 1;
 86    }
 87
 88    EnterCriticalSection(&CriticalSection);
 89    // 新连接到来,为该新连接创建的必要的数据结构,维护该SOCKET的信息
 90    if ((SocketArray[EventTotal] = (LPSOCKET_CONTEXT) GlobalAlloc(GPTR, sizeof(SOCKET_CONTEXT))) == NULL) {
 91      printf("GlobalAlloc() failed with error %d\n", GetLastError());
 92      return 1;
 93    }
 94
 95    // 为该 SOCKE 创建关联的 OVERLAPPED 结构体,
 96    // 初始化 DataBuf 字段,无论我们是接收数据还是发送数据,我们都会使用它
 97    SocketArray[EventTotal]->Socket = AcceptSocket;
 98    ZeroMemory(&(SocketArray[EventTotal]->Overlapped), sizeof(OVERLAPPED));
 99    SocketArray[EventTotal]->BytesSEND = 0;
100    SocketArray[EventTotal]->BytesRECV = 0;
101    SocketArray[EventTotal]->DataBuf.len = DATA_BUFSIZE;
102    SocketArray[EventTotal]->DataBuf.buf = SocketArray[EventTotal]->Buffer;
103
104    // 初始化该 Overlapped 结构体的 hEvent 字段,我们异步完成时,我们便可以通过该事件得到通知
105    // 这样我们便不需要轮询该异步任务的结果,而是直接等到该 Event 被触发,然后区处理便可.
106    if ((SocketArray[EventTotal]->Overlapped.hEvent = EventArray[EventTotal] = WSACreateEvent()) == WSA_INVALID_EVENT) {
107      printf("WSACreateEvent() failed with error %d\n", WSAGetLastError());
108      return 1;
109    }
110
111    // 从该连接上读取数据. 
112    Flags = 0;
113    if (WSARecv(SocketArray[EventTotal]->Socket, &(SocketArray[EventTotal]->DataBuf), 1, &RecvBytes, &Flags, &(SocketArray[EventTotal]->Overlapped), NULL) == SOCKET_ERROR) {
114      if (WSAGetLastError() != ERROR_IO_PENDING) {
115        printf("WSARecv() failed with error %d\n", WSAGetLastError());
116        return 1;
117      }
118      // else 表示我们已经成功的提交了异步读任务,该任务目前还在进行中。
119      //  当它完成时,我们在子线程中处理
120    } 
121    // else 说明我们已经成功的读取到了数据,
122    // 读取到的数据存储在 DataBuf 中, 
123    // RecvBytes 存储接收到的数据长度
124
125    EventTotal++;
126    LeaveCriticalSection(&CriticalSection);
127    
128    if (WSASetEvent(EventArray[0]) == FALSE) {
129      printf("WSASetEvent() failed with error %d\n", WSAGetLastError());
130      return 1;
131    }
132   }
133}
134
135DWORD WINAPI ProcessIO(LPVOID lpParameter) {
136
137  DWORD Index;
138  DWORD Flags;
139  LPSOCKET_CONTEXT SocketContext;
140  DWORD BytesTransferred;
141  DWORD i;
142  DWORD RecvBytes, SendBytes;
143  
144  while(TRUE) {
145    // 等待我们提交的异步任务完成的事件
146    if ((Index = WSAWaitForMultipleEvents(EventTotal, EventArray, FALSE,  WSA_INFINITE, FALSE)) == WSA_WAIT_FAILED) {
147      printf("WSAWaitForMultipleEvents() failed %d\n", WSAGetLastError());
148      return 0;
149    }
150
151    if ((Index - WSA_WAIT_EVENT_0) == 0) {
152      WSAResetEvent(EventArray[0]);
153      continue;
154    }
155
156    SocketContext = SocketArray[Index - WSA_WAIT_EVENT_0];
157    WSAResetEvent(EventArray[Index - WSA_WAIT_EVENT_0]); // ResetEvent,以便后边重用该事件
158
159    // 获取当前完成的异步任务的结果
160    if (WSAGetOverlappedResult(SocketContext->Socket, &(SocketContext->Overlapped), &BytesTransferred, FALSE, &Flags) == FALSE || BytesTransferred == 0) {
161      printf("Closing socket %d\n", SocketContext->Socket);
162
163      if (closesocket(SocketContext->Socket) == SOCKET_ERROR) {
164        printf("closesocket() failed with error %d\n", WSAGetLastError());
165      } 
166
167      GlobalFree(SocketContext);
168      WSACloseEvent(EventArray[Index - WSA_WAIT_EVENT_0]);
169
170      // Cleanup SocketArray and EventArray by removing the socket event handle
171      // and socket information structure if they are not at the end of the array
172      EnterCriticalSection(&CriticalSection);
173
174      if ((Index - WSA_WAIT_EVENT_0) + 1 != EventTotal)
175        for (i = Index - WSA_WAIT_EVENT_0; i < EventTotal; i++) {
176          EventArray[i] = EventArray[i + 1];
177          SocketArray[i] = SocketArray[i + 1];
178        }
179      EventTotal--;
180      LeaveCriticalSection(&CriticalSection);
181      continue;
182    }
183    
184    if (SocketContext->BytesRECV == 0) {
185      SocketContext->BytesRECV = BytesTransferred;
186      SocketContext->BytesSEND = 0;
187    } else {
188      SocketContext->BytesSEND += BytesTransferred;
189    }
190
191    if (SocketContext->BytesRECV > SocketContext->BytesSEND) {
192      // 重置 Overlapped 结构体,我们要重用这个结构体,提交一个新的任务
193      ZeroMemory(&(SocketContext->Overlapped), sizeof(WSAOVERLAPPED));
194      SocketContext->Overlapped.hEvent = EventArray[Index - WSA_WAIT_EVENT_0];
195      SocketContext->DataBuf.buf = SocketContext->Buffer + SocketContext->BytesSEND;
196      SocketContext->DataBuf.len = SocketContext->BytesRECV - SocketContext->BytesSEND;
197
198      if (WSASend(SocketContext->Socket, &(SocketContext->DataBuf), 1, &SendBytes, 0, &(SocketContext->Overlapped), NULL) == SOCKET_ERROR) {
199        if (WSAGetLastError() != ERROR_IO_PENDING) {
200          printf("WSASend() failed with error %d\n", WSAGetLastError());
201          return 0;
202        }
203      }
204
205    } else {
206      SocketContext->BytesRECV = 0;
207      // Now that there are no more bytes to send post another WSARecv() request
208      Flags = 0;
209      ZeroMemory(&(SocketContext->Overlapped), sizeof(WSAOVERLAPPED));
210      SocketContext->Overlapped.hEvent = EventArray[Index - WSA_WAIT_EVENT_0];
211      SocketContext->DataBuf.len = DATA_BUFSIZE;
212      SocketContext->DataBuf.buf = SocketContext->Buffer;
213
214      if (WSARecv(SocketContext->Socket, &(SocketContext->DataBuf), 1, &RecvBytes, &Flags, &(SocketContext->Overlapped), NULL) == SOCKET_ERROR) {
215        if (WSAGetLastError() != ERROR_IO_PENDING) {
216          printf("WSARecv() failed with error %d\n", WSAGetLastError());
217          return 0;
218        }
219      }
220    }
221  }
222}

END !!!


Tags