WinSock I/O 模型 -- WSAEventSelect 模型

Oct 15, 2021

目录


简介


WSAEventSelect 模型也是 WinSock 中最常见的异步 I/O 模型。

这篇文章我们就来看看如何使用 WSAEventSelect api 来实现一个简单的 TCP 服务器.


API 基础


WSAEventSelect

WSAEventSelect 用来把一个 SOCKET 对象和一个 WSAEVENT 对象关联起来。 lNetworkEvents 表示我们关心的 FD_XXX 网络事件. 如果关心多个 SOCKET 事件,可以使用 OR 的方式指定多个 FD_XXX 标志。

1int WSAAPI WSAEventSelect(
2  SOCKET   s,
3  WSAEVENT hEventObject,
4  long     lNetworkEvents
5);

当特定的事件发生在相应的 SOCKET 上,该 SOCKET 上接下来的事件将会被阻塞,直到当前的事件被应用处理. 该事件被处理之后,接下来的事件将可以被进一步触发.

事件 处理函数
FD_READ recv, recvFrom, WSARecv, WSARecvEx, WSARecvFrom
FD_WRITE send, sendTo, WSASend, WSASentTo
FD_OOB recv, recvFrom, WSARecv, WSARecvEx, WSARecvFrom
FD_ACCEPT accept, AcceptEx, WSAAccept
FD_CONNECT None
FD_CLOSE None
FD_QOS WSAIoctl (with SIO_GET_QOS)
FD_GROUP_QOS Reserved
FD_ROUTINE_INTERFACE_CHANGE WSAIoctl (with SIO_ROUTINE_INTERFACE_CHANGE)
FD_ADDRESS_LIST_CHANGE WSAIoctl (with SIO_ADDRESS_LIST_CHANGE)

WSAEvent

WSACreateEvent 方法用来创建一个 WSAEvent 对象

1WSAEVENT WSAAPI WSACreateEvent();

WSAWaitForMultipleEvents 用于等待一组事件中的一个或全部被触发。

1DWORD WSAAPI WSAWaitForMultipleEvents(
2  DWORD          cEvents,
3  const WSAEVENT *lphEvents,
4  BOOL           fWaitAll,
5  DWORD          dwTimeout,
6  BOOL           fAlertable
7);
  • cEvents:指定 lphEvents 数组中事件对象的数量。 该参数的最大值是 WSA_MAXIMUM_WAIT_EVENTS (64)
  • lphEvents:事件对象的集合
  • fWaitAll: 指定等待 lphEvents 中所有事件被触发或者其中之一被触发。 如果指定为 TRUE, 那么该函数只有在所有事件对象都被触发之后才会返回。 如果指定为 FALSE, 当事件集合中任何一个事件被触发之后,该方法就会返回。如果在这种情况下有多个事件对象被触发,那个返回值将会返回该事件集合中索引值最小的索引值. 索引值减去 WSA_WAIT_EVENT_0 便是指向 lphEvents 中被触发的事件的索引值.
  • dwTimeout: 如果在 timeout 事件间隔内,没有事件被触发,函数不会一直阻塞,而是在等待 timeout 毫秒后返回。 指定该参数为 WSA_INFINITE, 该函数会一直等待,直到有事件被触发. 指定该参数为 0, 该函数会立即返回.
  • fAlertable: 略

WSAEnumNetworkEvents 用于查询当前 SOCKET 上触发事件对象 hEventObject 的对应 socket 事件(FD_READ, FD_WRITE 等).

1int WSAAPI WSAEnumNetworkEvents(
2  SOCKET             s,
3  WSAEVENT           hEventObject,
4  LPWSANETWORKEVENTS lpNetworkEvents
5);

实现思路


  1. 创建一个 socket 作为监听 socket
  2. 使用 WSAEventSelect 监听该 SOCKET 上的网络事件
  3. 使用 WSAWaitForMultipleEvents 等待 SOCKET 事件
  4. 当 SOCKET 上有事件被触发,使用 WSAEnumNetworkEvents 查询具体的 SOCKET 事件,并使用相应的 API 处理事件.
  5. 当有新的 SOCKET 连接到来,接收该连接,重复 2-4 步骤.

实例

接下来我们通过一个实例来看看如何实现.

  1#include <winsock2.h>
  2#include <windows.h>
  3#include <stdio.h>
  4
  5#pragma comment(lib,"ws2_32.lib")
  6
  7#define PORT 8080
  8#define DATA_BUFSIZE 8192
  9 
 10typedef struct _SOCKET_CONTEXT {
 11   CHAR   Buffer[DATA_BUFSIZE];
 12   WSABUF DataBuf;
 13   SOCKET Socket;
 14   DWORD  BytesSEND;
 15   DWORD  BytesRECV;
 16} SOCKET_CONTEXT, * LPSOCKET_CONTEXT;
 17
 18BOOL CreateSocketInformation(SOCKET s);
 19void FreeSocketInformation(DWORD Event);
 20
 21// 这里我们维护了如下数据结构:
 22//     EeventArray: 我们为每个 SOCKET 对象创建一个对应的事件对象,以便我们能监听该 SOCKET 上的网络事件
 23//     SocketArray: 毫无疑问,我们也需要维护所有SOCKET连接的的数组。其中包含 Listen Socket
 24
 25DWORD            EventTotal = 0;
 26WSAEVENT         EventArray[WSA_MAXIMUM_WAIT_EVENTS];
 27LPSOCKET_CONTEXT SocketArray[WSA_MAXIMUM_WAIT_EVENTS];
 28
 29int main() {
 30
 31  SOCKET           ListenSocket;
 32  SOCKET           AcceptSocket;
 33  SOCKADDR_IN      Addr;
 34  LPSOCKET_CONTEXT SocketContext;
 35  WSANETWORKEVENTS NetworkEvents;
 36  DWORD            Event;
 37  WSADATA          wsaData;
 38  DWORD            Flags;
 39  DWORD            RecvBytes;
 40  DWORD            SendBytes;
 41
 42  // 初始化 Listen Socket 对象
 43  if (WSAStartup(0x0202, &wsaData) != 0) {
 44    printf("WSAStartup() failed with error %d\n", WSAGetLastError());
 45    return 1;
 46  }
 47
 48  if ((ListenSocket = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) {
 49    printf("socket() failed with error %d\n", WSAGetLastError());
 50    return 1;
 51  }
 52
 53  if (CreateSocketInformation(ListenSocket) == FALSE) {
 54    printf("CreateSocketInformation() failed!\n");
 55    return 1;
 56  }
 57
 58  // 在调用 listen api 之前,我们需要使用 WSAEventSelect 需要将 ListenSocket 与一个 WSAEvent 对象关联起来,这里我们仅仅关系 FD_ACCEPT 和 FS_CLOSE 事件.
 59  // 当这两个事件之一被触发,我们编译可以从 EventArray[0] 上查询到这些实际,以便进行处理
 60  if (WSAEventSelect(ListenSocket, EventArray[EventTotal - 1], FD_ACCEPT | FD_CLOSE) == SOCKET_ERROR) {
 61    printf("WSAEventSelect() failed with error %d\n", WSAGetLastError());
 62    return 1;
 63  }
 64
 65  Addr.sin_family      = AF_INET;
 66  Addr.sin_addr.s_addr = htonl(INADDR_ANY);
 67  Addr.sin_port        = htons(PORT);
 68 
 69  if (bind(ListenSocket, (PSOCKADDR) &Addr, sizeof(Addr)) == SOCKET_ERROR) {
 70    printf("bind() failed with error %d\n", WSAGetLastError());
 71    return 1;
 72  }
 73
 74  if (listen(ListenSocket, 10)) {
 75    printf("listen() failed with error %d\n", WSAGetLastError());
 76    return 1;
 77  }
 78
 79  while(TRUE) {
 80
 81    // 等待当前所有 socket 上的网络事件被触发
 82    //
 83    // 这里我们 fWait = FALSE, 也就是说任何一个 SOCKET 上有网络之间,
 84    // 我们便停止等待,开始处理该事件
 85    //
 86    // dwTimeout = WSA_INFINITE, 如果没有网络事件发生,我们就一直等待,
 87    // 直到有网络事件发生
 88    //
 89    // 这里 EventTotal 会随着客户端连接的到来增加,同时我们会创建对应的 Event对象,
 90    // 并放入 EventArray
 91    if ((Event = WSAWaitForMultipleWSAWaitForEvents(EventTotal, EventArray, FALSE, WSA_INFINITE, FALSE)) == WSA_WAIT_FAILED) {
 92      printf("WSAWaitForMultipleEvents() failed with error %d\n", WSAGetLastError());
 93      return 1;
 94    }
 95
 96    // 程序运行到这里,已经有网络事件发生了,我们使用WSAEnumNetworkEvents 查询到底是什么网络事件, 
 97    // 查询结果保存在 NetworkEvents 对象上
 98    // 注意,在 API 章节我们已经说明,WSAWaitForMultipleWSAWaitForEvents 的返回值减去 WSA_WAIT_EVENT_0 才是对应的 EventArray 中被触发的事件的索引值
 99    if (WSAEnumNetworkEvents(
100        SocketArray[Event - WSA_WAIT_EVENT_0]->Socket,
101        EventArray[Event - WSA_WAIT_EVENT_0], 
102        &NetworkEvents) == SOCKET_ERROR
103    ) {
104      printf("WSAEnumNetworkEvents() failed with error %d\n", WSAGetLastError());
105      return 1;
106    }
107    
108    // 检查当前事件是否是 FD_ACCEPT
109    // 如果是 FD_ACCEPT事件,使用 accept 接收新的连接。
110    if (NetworkEvents.lNetworkEvents & FD_ACCEPT) {
111      if (NetworkEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
112        printf("FD_ACCEPT failed with error %d\n", NetworkEvents.iErrorCode[FD_ACCEPT_BIT]);
113        break;
114      }
115
116      if ((AcceptSocket = accept(SocketArray[Event - WSA_WAIT_EVENT_0]->Socket, NULL, NULL)) == INVALID_SOCKET) {
117        printf("accept() failed with error %d\n", WSAGetLastError());
118        break;
119      }
120
121      if (EventTotal > WSA_MAXIMUM_WAIT_EVENTS) {
122        printf("Too many connections - closing socket...\n");
123        closesocket(AcceptSocket);
124        break;
125      }
126
127      // 接收新的连接之后,为该 SOCKET 创建 WSAEvent对象(在CreateSocketInformation 实现)
128      // 然后监听该 SOCKET 的 FD_READ, FD_WRITE, FD_CLOSE 事件
129      CreateSocketInformation(AcceptSocket);
130      if (WSAEventSelect(AcceptSocket, EventArray[EventTotal - 1], FD_READ|FD_WRITE|FD_CLOSE) == SOCKET_ERROR) {
131        printf("WSAEventSelect() failed with error %d\n", WSAGetLastError());
132        return 1;
133      }
134
135      printf("Socket %d got connected...\n", AcceptSocket);
136    }
137
138    // 检查当前事件是否是 FD_READ 或者 FD_WRITE
139    if (NetworkEvents.lNetworkEvents & FD_READ || NetworkEvents.lNetworkEvents & FD_WRITE) {
140
141      // 检查是不是发生了读错误
142      if (NetworkEvents.lNetworkEvents & FD_READ && NetworkEvents.iErrorCode[FD_READ_BIT] != 0) {
143        printf("FD_READ failed with error %d\n", NetworkEvents.iErrorCode[FD_READ_BIT]);
144        break;
145      }
146
147      // 检查是不是发生了写错误
148      if (NetworkEvents.lNetworkEvents & FD_WRITE && NetworkEvents.iErrorCode[FD_WRITE_BIT] != 0) {
149        printf("FD_WRITE failed with error %d\n", NetworkEvents.iErrorCode[FD_WRITE_BIT]);
150        break;
151      }
152
153      SocketContext = SocketArray[Event - WSA_WAIT_EVENT_0];
154
155      // Read data only if the receive buffer is empty
156      if (SocketContext->BytesRECV == 0) {
157        SocketContext->DataBuf.buf = SocketContext->Buffer;
158        SocketContext->DataBuf.len = DATA_BUFSIZE;
159
160        Flags = 0;
161        if (WSARecv(SocketContext->Socket, &(SocketContext->DataBuf), 1, &RecvBytes, &Flags, NULL, NULL) == SOCKET_ERROR) {
162          if (WSAGetLastError() != WSAEWOULDBLOCK) {
163            printf("WSARecv() failed with error %d\n", WSAGetLastError());
164            FreeSocketInformation(Event - WSA_WAIT_EVENT_0);
165            return 1;
166          }
167        } else {
168          printf("WSARecv() is working!\n");
169          SocketContext->BytesRECV = RecvBytes;
170        }
171      }
172
173      if (SocketContext->BytesRECV > SocketContext->BytesSEND) {
174
175        SocketContext->DataBuf.buf = SocketContext->Buffer + SocketContext->BytesSEND;
176        SocketContext->DataBuf.len = SocketContext->BytesRECV - SocketContext->BytesSEND;
177
178        if (WSASend(SocketContext->Socket, &(SocketContext->DataBuf), 1, &SendBytes, 0, NULL, NULL) == SOCKET_ERROR) {
179
180          if (WSAGetLastError() != WSAEWOULDBLOCK) {
181            printf("WSASend() failed with error %d\n", WSAGetLastError());
182            FreeSocketInformation(Event - WSA_WAIT_EVENT_0);
183            return 1;
184          }
185          // A WSAEWOULDBLOCK error has occurred. An FD_WRITE event will be posted
186          // when more buffer space becomes available
187        } else {
188
189          printf("WSASend() is fine! Thank you...\n");
190          SocketContext->BytesSEND += SendBytes;
191
192          if (SocketContext->BytesSEND == SocketContext->BytesRECV) {
193            SocketContext->BytesSEND = 0;
194            SocketContext->BytesRECV = 0;
195          }
196        }
197      }
198    }
199
200    // 检查当前事件是否是 FD_CLOSE
201    if (NetworkEvents.lNetworkEvents & FD_CLOSE) {
202      // 检查是否发生了错误
203      if (NetworkEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
204        printf("FD_CLOSE failed with error %d\n", NetworkEvents.iErrorCode[FD_CLOSE_BIT]);
205        break;
206      } else {
207        // socket 正常关闭
208        printf("FD_CLOSE is OK!\n");
209      }
210
211      printf("Closing socket information %d\n", SocketArray[Event - WSA_WAIT_EVENT_0]->Socket);
212      FreeSocketInformation(Event - WSA_WAIT_EVENT_0);
213    }
214  }
215  return 0;
216}
217
218 
219BOOL CreateSocketInformation(SOCKET s) {
220  LPSOCKET_CONTEXT SocketContext;
221
222  if ((EventArray[EventTotal] = WSACreateEvent()) == WSA_INVALID_EVENT) {
223    printf("WSACreateEvent() failed with error %d\n", WSAGetLastError());
224    return FALSE;
225  }
226
227  if ((SocketContext = (LPSOCKET_CONTEXT) GlobalAlloc(GPTR, sizeof(SOCKET_CONTEXT))) == NULL) {
228    printf("GlobalAlloc() failed with error %d\n", GetLastError());
229    return FALSE;
230  }
231
232  // Prepare SocketInfo structure for use
233  SocketContext->Socket = s;
234  SocketContext->BytesSEND = 0;
235  SocketContext->BytesRECV = 0;
236
237  SocketArray[EventTotal] = SocketContext;
238  EventTotal++;
239  return TRUE;
240}
241
242void FreeSocketInformation(DWORD Event) {
243
244  LPSOCKET_CONTEXT SocketContext = SocketArray[Event];
245  DWORD i;
246
247  closesocket(SocketContext->Socket);
248  GlobalFree(SocketContext);
249 
250  if(WSACloseEvent(EventArray[Event]) == TRUE) {
251    printf("WSACloseEvent() is OK!\n\n");
252  }
253
254  // Squash the socket and event arrays
255  for (i = Event; i < EventTotal; i++) {
256    EventArray[i] = EventArray[i + 1];
257    SocketArray[i] = SocketArray[i + 1];
258  }
259  EventTotal--;
260}

END!!!


Tags