WinSock I/O 模型 -- Select 模型

Oct 15, 2021

目录


简介


Select 模型是 WinSock 中最常见的 I/O 模型,这篇文章我们就来看看如何使用 Select api 来实现一个简单的 TCP 服务器.


API 基础


Select 模型依赖 WinSock API Select 来检查当前 Socket 是否可写或者可读。

使用这个 API 的优点是我们不需要使用阻塞的 Socket API (recv, send) 来等待 Socket 状态准备就绪,我们可以异步的检查 Socket 的状态来进行读数据或者写数据.

Select 方法的声明如下:

1int WSAAPI select(
2  int           nfds,
3  fd_set        *readfds,
4  fd_set        *writefds,
5  fd_set        *exceptfds,
6  const timeval *timeout
7);

其中:

  • nfds: 直接忽略即可,该参数的设计是为了兼容 Berkeley Socket 的实现
  • redfds: 返回值,当前可读的 socket 的集合
  • writefds: 返回值,当前可写的 socket 的集合
  • exceptfds:返回值,当前发生错误的 socket 的集合
  • 返回值: 表示当前准备就绪的 socket 的数量。 这里的准备就绪包含 可读,可写,或者储出错的socket。如果返回 SOCKET_ERROR,表示发生错误,可以使用 WSAGetLastError 来获取具体的错误码。

fd_set

fd_set 是一个 socket 的集合,作为 select 方法的输入输出参数.

这里使用到的操作包括:

  • FD_ZERO : 重置 fd_set
  • FD_SET: 将 socket handle 添加到当前 fd_set
  • FD_ISSET: 检查某个 socket handle 是否处于当前 fd_set

实现思路


  1. 创建一个 socket 作为监听 socket,并将该 socket 设置为非阻塞模式.
  2. 使用 select api 来非阻塞的简单该监听socket 是否有新连接进来。如果有,则调用 accept 来接收该 client socket
  3. 对于已经与客户段建立的连接,同样的设置为非阻塞模式,使用 select api 来检查该 socket 上是否有数据可读,或者该 socket 是否可写,以便往客户端发送数据。还需要检查socket 是否出错,本文的例子里忽略这点,思路是一样的。
  4. 注意,这里所有的操作都是非阻塞的。

解析来我们通过一个例子看看如何使用 Select.

实例

本文的例子可以直接拷贝运行。 读者如果不需要运行,直接注意加注释的代码段即可.

服务器实现

  1#include <WinSock2.h>
  2#include <Windows.h>
  3#include <stdio.h>
  4
  5#pragma comment(lib, "ws2_32")
  6
  7#define DEFALT_PORT 8080
  8#define DATA_BUFFER 8192
  9
 10typedef struct _SOCKET_CONTEXT {
 11  SOCKET     Socket;
 12  WSABUF     DataBuf;
 13  OVERLAPPED Overlapped;
 14  CHAR       Buffer[DATA_BUFFER];
 15  DWORD      BytesSEND;
 16  DWORD      BytesRECV;
 17} SOCKET_CONTEXT, * LPSOCKET_CONTEXT;
 18
 19BOOL CreateSocketContext(SOCKET s);
 20void FreeSocketContext(DWORD Index);
 21
 22DWORD TotalSockets = 0;
 23LPSOCKET_CONTEXT SocketArray[FD_SETSIZE];
 24
 25int main() {
 26  INT Ret;
 27  WSADATA wsaData;
 28  SOCKET ListenSocket;
 29  SOCKET AcceptSocket;
 30  SOCKADDR_IN Addr;
 31  ULONG NonBlock = 1;
 32  FD_SET ReadSet;
 33  FD_SET WriteSet;
 34  DWORD Total;
 35  DWORD Flags;
 36  DWORD RecvBytes;
 37  DWORD SentBytes;
 38  DWORD i;
 39
 40  if ((Ret = WSAStartup(0x0202, &wsaData)) != 0) {
 41    printf("WSAStartup failed with error %d\n", Ret);
 42    WSACleanup();
 43    return 1;
 44  }
 45
 46  if ((ListenSocket = WSASocketW(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
 47    printf("WSASocket failed with error %d\n", WSAGetLastError());
 48    return 1;
 49  }
 50
 51  Addr.sin_family = AF_INET;
 52  Addr.sin_addr.s_addr= htonl(INADDR_ANY);
 53  Addr.sin_port = htons(DEFALT_PORT);
 54
 55  if (bind(ListenSocket, (PSOCKADDR) &Addr, sizeof(Addr)) == SOCKET_ERROR) {
 56    printf("bind failed with error %d\n", WSAGetLastError());
 57    return 1;
 58  }
 59
 60  if (listen(ListenSocket, 10)) {
 61    printf("listen failed with eror %d\n", WSAGetLastError());
 62    return 1;
 63  }
 64
 65  // 设置监听socket为异步模式
 66  if (ioctlsocket(ListenSocket, FIONBIO, &NonBlock) == SOCKET_ERROR) {
 67    printf("ioctlsocket failed with error %d\n", WSAGetLastError());
 68    return 1;
 69  }
 70
 71  while (TRUE) {
 72    // 清空 ReadSet 和 WriteSet,我们将该集合中放入我们关心的 socket handle
 73    FD_ZERO(&ReadSet);
 74    FD_ZERO(&WriteSet);
 75
 76    // 将监听socket 放入 ReadSet, 以便当有新连接到来的时候,我们可以检查到该事件
 77    FD_SET(ListenSocket, &ReadSet);
 78
 79    // 我们同时也关心已经建立的客户段连接的可读可写状态,以便我们从客户端接收数据或者写数据
 80    // 这里一些小逻辑,直接忽略
 81    for (i = 0; i < TotalSockets; i++) {
 82      if (SocketArray[i]->BytesRECV > SocketArray[i]->BytesSEND) {
 83        FD_SET(SocketArray[i]->Socket, &WriteSet);
 84      } else {
 85        FD_SET(SocketArray[i]->Socket, &ReadSet);
 86      }
 87    }
 88
 89    // 使用 select 检查当前 ReadSet 和 WriteSet 中的socket 是否有新的事件到来
 90    if ((Total = select(0, &ReadSet, &WriteSet, NULL, NULL)) == SOCKET_ERROR) {
 91      printf("select failed with error %d\n", WSAGetLastError());
 92      return 1;
 93    }
 94
 95    // 使用 FD_ISSET 判断监听 socket 是否可以读,也就是说有新的连接到来
 96    // 如果有,调用 accept 来接收该新连接
 97    if (FD_ISSET(ListenSocket, &ReadSet)) {
 98      Total--;
 99      if ((AcceptSocket = accept(ListenSocket, NULL, NULL)) != INVALID_SOCKET) {
100        
101        NonBlock = 1;
102        if (ioctlsocket(AcceptSocket, FIONBIO, &NonBlock) == SOCKET_ERROR) {
103          printf("ioctlsocket failed with error %d\n", WSAGetLastError());
104          return 1;
105        }
106
107        if (CreateSocketContext(AcceptSocket) == FALSE) {
108          printf("CreateSocketContext failed");
109          return 1;
110        }
111      } else {
112        if (WSAGetLastError() != WSAEWOULDBLOCK) {
113          printf("accept failed with error %d\n", WSAGetLastError());
114          return 1;
115        } else {
116          printf("accept returns WSAEWOULDBLOCK\n");
117        }
118      }
119    }
120
121    // 接下来检查可读的客户段连接
122    for (i = 0; Total > 0 && i < TotalSockets; i++) {
123      LPSOCKET_CONTEXT Ctx = SocketArray[i];
124
125      if (FD_ISSET(Ctx->Socket, &ReadSet)) {
126        Total--;
127        Ctx->DataBuf.buf = Ctx->Buffer;
128        Ctx->DataBuf.len = DATA_BUFFER;
129        
130        //当前 socket 可读,那么调用 WSARecv 从该 socket 读取数据
131        // 如果 WSARecv 返回 0, 是说该连接已经断开
132        Flags = 0;
133        if (WSARecv(Ctx->Socket, &(Ctx->DataBuf), 1, &RecvBytes, &Flags, NULL, NULL) == SOCKET_ERROR) {
134          if (WSAGetLastError() != WSAEWOULDBLOCK) {
135            printf("WSARecv failed with error %d\n", WSAGetLastError());
136            FreeSocketContext(i);
137          } else {
138            printf("WSARecv returns WSAEWOULDBLOCK");
139          }
140          continue;
141        } else {
142          Ctx->BytesRECV = RecvBytes;
143
144          // If zero bytes are received, this indicates the peer closed the connection.
145          if (RecvBytes == 0) {
146            FreeSocketContext(i);
147            continue;
148          } else {
149            printf("Recv %d bytes data from the socket %d\n", RecvBytes, Ctx->Socket);
150          }
151        }
152      }
153      
154      // 接下来检查可写的客户段连接
155      if (FD_ISSET(Ctx->Socket, &WriteSet)) {
156        Total--;
157        Ctx->DataBuf.buf = Ctx->Buffer + Ctx->BytesSEND;
158        Ctx->DataBuf.len = Ctx->BytesRECV - Ctx->BytesSEND;
159
160        if (WSASend(Ctx->Socket, &(Ctx->DataBuf), 1, &SentBytes, 0, NULL, NULL) == SOCKET_ERROR) {
161          if (WSAGetLastError() != WSAEWOULDBLOCK) {
162            printf("WSASend failed with error %d\n", WSAGetLastError());
163            FreeSocketContext(i);
164          } else {
165            printf("WSASend returns WSAEWOULDBLOCK");
166          }
167          continue;
168
169        } else {
170          Ctx->BytesSEND += SentBytes;
171          if (Ctx->BytesSEND == Ctx->BytesRECV) {
172            Ctx->BytesSEND = 0;
173            Ctx->BytesRECV = 0;
174          }
175        }
176      }
177    }
178
179  }
180}
181
182BOOL CreateSocketContext(SOCKET s) {
183  LPSOCKET_CONTEXT Ctx;
184  printf("Accepted a new socket %d\n", s);
185
186  if ((Ctx = (LPSOCKET_CONTEXT) GlobalAlloc(GPTR, sizeof(SOCKET_CONTEXT))) == NULL) {
187    printf("GlobalAlloc() failed with error %d\n", GetLastError());
188    return FALSE;
189  }
190
191  Ctx->Socket = s;
192  Ctx->BytesSEND = 0;
193  Ctx->BytesRECV = 0;
194  SocketArray[TotalSockets] = Ctx;
195  TotalSockets++;
196
197  return TRUE;
198}
199
200void FreeSocketContext(DWORD Index) {
201  DWORD i;
202  LPSOCKET_CONTEXT Ctx = SocketArray[Index];
203  printf("Closing socket %d\n", Ctx->Socket);
204
205  closesocket(Ctx->Socket);
206  GlobalFree(Ctx);
207
208  for (i = Index; i < TotalSockets; i++) {
209    SocketArray[i] = SocketArray[i + 1];
210  }
211  TotalSockets--;
212}

客户端实现

搭配该服务器,使用下面 client 实现进行测试。 这里仅仅做测试用,忽略了大部分的错误检查.

 1#include <winsock2.h>
 2#include <stdio.h>
 3#include <stdlib.h>
 4
 5#define DEFAULT_COUNT       20
 6#define DEFAULT_PORT        8080
 7#define DEFAULT_BUFFER      2048
 8#define DEFAULT_MESSAGE     "\'A test message from client\'"
 9
10#pragma warning(disable:4996) 
11#pragma comment(lib, "ws2_32")
12
13char szMessage[1024];
14char szServer[128];
15
16int main(int argc, char **argv) {
17
18  WSADATA       wsaData;
19  SOCKET        ClientSocket;
20  char          szBuffer[DEFAULT_BUFFER];
21  int           ret, i;
22  SOCKADDR_IN   ServerAddr;
23  struct hostent    *host = NULL;
24
25  WSAStartup(0x0202, &wsaData);
26
27  strcpy_s(szMessage, sizeof(szMessage), DEFAULT_MESSAGE);
28  strcpy_s(szServer, sizeof(szServer), "127.0.0.1");
29
30  ClientSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
31
32  ServerAddr.sin_family = AF_INET;
33  ServerAddr.sin_port = htons(DEFAULT_PORT);
34  ServerAddr.sin_addr.s_addr = inet_addr(szServer);
35
36  if (connect(ClientSocket, (struct sockaddr *) &ServerAddr, sizeof(ServerAddr)) == SOCKET_ERROR) {
37    printf("connect failed with error %d\n", WSAGetLastError());
38    return 1;
39  }
40
41  printf("Sending and receiving data if any...\n");
42
43  for(i = 0; i < DEFAULT_COUNT; i++) {
44
45    if ((ret = send(ClientSocket, szMessage, strlen(szMessage), 0)) == SOCKET_ERROR) {
46      printf("send() failed with error %d\n", WSAGetLastError());
47      break;
48    }
49    printf("send() is OK. Send %d bytes: %s\n", ret, szBuffer);
50
51    if ((ret = recv(ClientSocket, szBuffer, DEFAULT_BUFFER, 0)) == SOCKET_ERROR) {
52      printf("recv() failed with error %d\n", WSAGetLastError());
53      break;
54    }
55    if (ret == 0) {
56      printf("It is a graceful close!\n");
57      break;
58    }
59
60    szBuffer[ret] = '\0';
61    printf("recv() is OK. Received %d bytes: %s\n", ret, szBuffer);
62  }
63
64  if(closesocket(ClientSocket) == 0) {
65    printf("closesocket() is OK!\n");
66  } else {
67    printf("closesocket() failed with error %d\n", WSAGetLastError());
68  }
69
70  WSACleanup();
71  return 0;
72}

END!!!


Tags