Chapter 21 - Asynchronous Notification IO 모델

21-1 : 비동기(Asynchronous) Notification IO 모델의 이해

send()recv() 함수는 동기(Synchronous)화된 입출력 함수

  • send() 함수가 호출되는 순간부터 데이터의 전송이 시작되고, send() 함수의 호출이 완료(반환)되는 순간 데이터의 전송이 완료됨
  • recv() 함수가 호출되는 순간부터 데이터의 수신이 시작되고, recv() 함수의 호출이 완료(반환)되는 순간 데이터의 수신이 완료됨
  • 즉, 입출력 함수의 반환시점과 데이터 송수신의 완료시점이 일치

동기화된 입출력은 입출력이 진행되는 동안 호출된 함수가 반환을 하지 않는다는 단점이 존재함
따라서 보다 효율적으로 CPU를 활용하기 위해서는 비동기 방식을 사용해야함


동기와 비동기 개념은 입출력 뿐만 아니라 Notification IO에도 존재

  • Notification IO : 입력버퍼에 데이터가 수신되어 데이터의 수신이 필요하거나, 출력버퍼가 비어 데이터의 전송이 가능한 상황의 알림
  • 대표적인 Notification IO 방식에는 select 방식이 존재
  • select()는 호출된 함수의 반환 시점이 IO가 필요하거나 가능한 상황이 발생한(IO 관련 이벤트가 발생한) 시점과 일치하기 때문에 동기 방식임
  • 이와달리 IO의 상태에 상관없이 반환이 이루어지면 비동기 방식이며, WSAEventSelect() 함수가 해당됨
  • 비동기 Notification IO의 경우 IO의 상태변화를 별도로 관찬ㄹ해야함

21-2 : 비동기(Asynchronous) Notification IO 모델의 이해와 구현

비동기 Notification IO 모델 구현에는 WSAEventSelect() 함수를 사용하는 방법과 WSAAsyncSelect() 함수를 사용하는 방법이 존재
단, WSAAsyncSelect() 함수를 사용하기 위해서는 발생한 이벤트를 수신할 윈도우의 핸들을 지정해야함

IO의 상태변화에는 소켓의 상태변화와 소켓의 이벤트 발생이 있음
WSAEventSelect() 함수는 임의의 소켓을 대상으로 이벤트 발생여부의 관찰을 명령할 때 사용

#include <winsock2.h>

int WSAEventSelect(SOCKET s, WSAEVENT hEventObject, long lNetworkEvents);
// 성공시 0, 실패시 SOCKET_ERROR 반환
  • hEventObject : 이벤트 발생유무의 확인을 위한 Event 오브젝트 핸들 전달
  • lNetworkEvents : 감시하고자 하는 이벤트의 유형에 대한 정보
    • FD_READ : 수신할 데이터 존재여부
    • FD_WRITE : 블로킹 없이 데이터 전송 가능 여부
    • FD_OOB : Out-of-band 데이터 수신 여부
    • FD_ACCEPT : 연결요청 여부
    • FD_CLOSE : 연결종료 요청 여부

WSAEventSelect() 함수는 지정한 이벤트 중 하나가 소켓에서 발생하면 Event 오브젝트를 signaled 상태로 바꿈

  • 즉, Event 오브젝트와 소켓을 연결하는 역할을 함
  • 비동기 방식이기 때문에 이베트 발생유무에 상관없이 바로 반환하며, 따라서 즉시 다른 작업을 진행할 수 있음
  • WSAEventSelect() 함수호출을 통해 전달된 소켓의 정보는 운영체제에 등록되기 때문에 추후 재호출이 불필요함


WSAEventSelect()에서 사용되는 Event 오브젝트는 manual-reset 모드이면서 non-signaled 상태이기 때문에 CreateEvent() 함수가 아닌 WSACreateEvent() 함수를 사용하여 생성하는 것이 편리

#include <winsock2.h>

WSAEVENT WSACreateEvent(void);
// 성공시 Event 오브젝트 핸들, 실패시 WSA_INVALID_EVENT 반환
// 반환형인 `WSAEVENT`는 내부적으로 `HANDLE`로 정의     

Event 오브젝트의 종료에는 WSACloseEvent() 함수를 사용

#include <winsock2.h>

BOOL WSACloseEvent(WSAEVENT hEvent);
// 성공시 TRUE, 실패시 FALSE 반환


이벤트의 발생유무 확인에는 WSAWaitForMultipleEvents() 함수를 사용

#include <winsock2.h>

DWORD WSAWaitForMultipleEvents(DWORD cEvents, const WSAEVENT *lphEvents, BOOL fwaitAll, DWORD dwTimeout, BOOL fAlertable);
// 성공시 이벤트 발생 오브젝트 관련정보, 실패시 WSA_INVALID_EVENT 반환
  • cEvents : 확인할 Event 오브젝트의 개수 정보
  • lphEvents : Event 오브젝트 핸들의 배열 주소값
  • fWaitAll : TRUE일시 모든 Event 오브젝트가 signaled일 때, FALSE일시 하나라도 signaled일 때 반환
  • dwTimeout : 1/1000초 단위로 타임아웃 지정, WSA_INFINITE일시 signaled 상태가 될때까지 반환하지 않음
  • fAlertable : TRUE일시 alertable wait 상태로 진입
  • 반환된 정수 값 - WSA_WAIT_EVENT_0 = lphEVents의 배열 기준으로 signaled 상태가 된 Event 오브젝트의 핸들이 저장된 인덱스, 둘 이상일시 그 중 작은 인덱스
  • 타임아웃시에는 WAIT_TIMEOUT 반환

전달할 수 잇는 최대 Event 오브젝트의 수가 64개임에 유의
Event 오브젝트가 manual-reset 모드이기 때문에 signaled 상태가 된 Event 오브젝트 전부를 아래와같이 확인할 수 있음

int posInfo = WSAWaitForMultipleEvents(numOfSock, hEventArray, FALSE, WSA_INFINITE, FALSE);
int startIdx = posInfo - WSA_WAIT_EVENT_0;
...
for (int i = startIdx; i < numOfSock; i++)
{
	int sigEventIdx = WSAWaitForMultipleEvents(1, &hEventArray[i], TRUE, 0, FALSE);
	...
}


Event 오브젝트가 signaled 상태가 된 원인을 확인하기 위해서는 WSAEnumNetworkEvents() 함수를 사용

#include <winsock2.h>

int WSAEnumNetworkEvents(SOCKET s, WSAEVENT hEventObject, LPWSANETWORKEVENTS lpNetworkEvents);
// 성공시 0, 실패시 SOCKET_ERROR 반환

typedef struct _WSANETWORKEVENTS
{
	long	lNetworkEvents;
	int		iErrorCode[FD_MAX_EVENTS];
} WSANETWORKEVENTS, *LPWSANETWORKEVNETS;
  • lpNetworkEvents : 발생한 이벤트의 유형정보 및 오류정보로 채워질 구조체 변수의 주소값 지정
  • WSAEnumNetworkEvents() 함수 호출시 Event 오브젝트가 non-signaled 상태로 되돌아감
  • 이벤트 FD_XXX 관련 오류 발생시 iErrorCode[FD_XXX_BIT]에 0 이외의 값이 저장됨

다음과 같은 방법으로 발생한 이벤트의 종류 확인 가능

WSANETWORKEVENTS	netEvents;
...
WSAEnumNetworkEvents(hSock, hEvent, &netEvents);
if (netEvents.lNetworkEvents & FD_ACCEPT)
{
	// FD_ACCEPT 이벤트 발생에 대한 처리
}
if (netEvents.lNetworkEvents & FD_READ)
{
	// FD_READ 이벤트 발생에 대한 처리
}
if (netEvents.lNetworkEvents & FD_CLOSE)
{
	// FD_CLOSE 이벤트 발생에 대한 처리
}


비동기 Notification IO 모델 에코 서버 예시

// AsynNotiEchoServ_win.cpp
// 클라이언트는 아무 에코 클라이언트나 사용

#include <iostream>
#include <winsock2.h>

using namespace std;

#define BUF_SIZE 100

void	CompressSockets(SOCKET hSockArr[], int idx, int total);
void	CompressEvents(WSAEVENT hEventArr[], int idx, int total);
void	ErrorHandling(char *msg);

int	main(int argc, char *argv[])
{
	WSADATA		wsaData;
	SOCKET		hServSock, hClntSock;
	SOCKADDR_IN	servAdr, clntAdr;

	SOCKET				hSockArr[WSA_MAXIMUM_WAIT_EVENTS];
	WSAEVENT			hEventArr[WSA_MAXIMUM_WAIT_EVENTS];
	WSAEVENT			newEvent;
	WSANETWORKEVENTS	netEvents;

	int		numOfClntSock = 0;
	int		strLen, posInfo, startIdx;
	int		clntAdrLen;
	char	msg[BUF_SIZE];

	if(argc != 2)
	{
		cout << "Usage : " << argv[0] << " <port>\n";
		exit(1);
	}

	if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
		ErrorHandling("WSAStartup() error");

	hServSock = socket(PF_INET, SOCK_STREAM, 0);
	memset(&servAdr, 0, sizeof(servAdr));
	servAdr.sin_family = AF_INET;
	servAdr.sin_addr.s_addr = htonl(INADDR_ANY);
	servAdr.sin_port = htons(atoi(argv[1]));

	if (::bind(hServSock, (SOCKADDR*)&servAdr, sizeof(servAdr)) == SOCKET_ERROR)
		ErrorHandling("bind() error");

	if (::listen(hServSock, 5) == SOCKET_ERROR)
		ErrorHandling("listen() error");

	newEvent = WSACreateEvent();
	if (WSAEventSelect(hServSock, newEvent, FD_ACCEPT) == SOCKET_ERROR)
		ErrorHandling("WSAEventSelect() error");
	
	hSockArr[numOfClntSock] = hServSock;
	hEventArr[numOfClntSock] = newEvent;
	numOfClntSock++;

	while (1)
	{
		posInfo = WSAWaitForMultipleEvents(numOfClntSock, hEventArr, FALSE, WSA_INFINITE, FALSE);
		startIdx = posInfo - WSA_WAIT_EVENT_0;

		for (int i = startIdx; i < numOfClntSock; i++)
		{
			int	sigEventIdx = WSAWaitForMultipleEvents(1, &hEventArr[i], TRUE, 0, FALSE);
			if ((sigEventIdx == WSA_WAIT_FAILED || sigEventIdx == WSA_WAIT_TIMEOUT))
				continue;
			else
			{
				sigEventIdx = i;
				WSAEnumNetworkEvents(hSockArr[sigEventIdx], hEventArr[sigEventIdx], &netEvents);
				if (netEvents.lNetworkEvents & FD_ACCEPT)	// 연결요청시
				{
					if (netEvents.iErrorCode[FD_ACCEPT_BIT] != 0)
					{
						cout << "Accept Error" << endl;
						break;
					}
					clntAdrLen = sizeof(clntAdr);
					hClntSock = accept(hSockArr[sigEventIdx], (SOCKADDR*)&clntAdr, &clntAdrLen);
					newEvent = WSACreateEvent();
					WSAEventSelect(hClntSock, newEvent, FD_READ | FD_CLOSE);

					hEventArr[numOfClntSock] = newEvent;
					hSockArr[numOfClntSock] = hClntSock;
					numOfClntSock++;
					cout << "connected new client...\n";
				}

				if (netEvents.lNetworkEvents & FD_READ)	// 데이터 수신시
				{
					if (netEvents.iErrorCode[FD_READ_BIT] != 0)
					{
						cout << "Read Error\n";
						break;
					}
					strLen = recv(hSockArr[sigEventIdx], msg, sizeof(msg), 0);
					send(hSockArr[sigEventIdx], msg, strLen, 0);
				}

				if (netEvents.lNetworkEvents & FD_CLOSE)	// 종료요청시
				{
					if (netEvents.iErrorCode[FD_CLOSE_BIT] != 0)
					{
						cout << "Read Error\n";
						break;
					}
					WSACloseEvent(hEventArr[sigEventIdx]);
					closesocket(hSockArr[sigEventIdx]);

					numOfClntSock--;
					CompressSockets(hSockArr, sigEventIdx, numOfClntSock);
					CompressEvents(hEventArr, sigEventIdx, numOfClntSock);
				}
			}
		}
	}
	WSACleanup();
	return 0;
}

void	CompressSockets(SOCKET hSockArr[], int idx, int total)
{
	for (int i = idx; i < total; i++)
		hSockArr[i] = hSockArr[i + 1];
}

void	CompressEvents(WSAEVENT hEventArr[], int idx, int total)
{
	for (int i = idx; i < total; i++)
		hEventArr[i] = hEventArr[i + 1];
}

void	ErrorHandling(char *msg)
{
	fputs(message, stderr);
	fputc('\n', stderr);
	exit(1);
}

내용 확인문제

  1. 동기 입출력과 비동기 입출력이 무엇인지, send & recv 함수를 기준으로 설명해보자. 그리고 동기 입출력의 단점은 무엇이고 이것이 비동기 입출력을 통해서 어떻게 해결이 되는지도 함께 설명하자.

    동기 입출력이란 send()recv() 처럼 입출력 함수의 반환 시점과 데이터 송수신 완료 시점이 일치하는 입출력을 뜻하고, 비동기 입출력은 두 시점이 일치하지 않는 입출력을 뜻함
    동기 입출력은 입출력 함수가 호출되는 동안 다른 작업이 이루어질 수 없기 때문에 비동기 입출력에서는 입출력 함수를 호출과 동시에 반환시켜 해결

  2. 모든 경우에 있어서 비동기 입출력이 최선의 선택이 될 수는 없다. 그렇다면 비동기 입출력의 단점은 무엇인가? 그리고 어떠한 경우에 동기 입출력이 좋은 선택이 될 수 있겠는가? 이에 대한 답을 내리기 위해서 비동기 입출력 관련 소스코드를 참고하기 바라며, 쓰레드와 관련해서도 여러분의 의견을 제시하기 바란다.

    비동기 입출력은 입출력의 완료 여부를 이후에 확인해야하기 때문에 서비스 형태가 간단하거나 응답에 필요한 데이터 크기가 작은 경우에는 불편할 수 있음
    특히 클라이언트 하나당 쓰레드를 하나씩 생성하는 서버 모델에서는 비동기 입출력을 진행할 필요가 없음

  3. select 방식과 관련된 다음 설명이 맞으면 O, 틀리면 X를 표시해보자.
    • select 방식은 호출된 함수의 반환을 통해서 IO 관련 이벤트의 발생을 알리니, Notification IO 모델이라 할 수 있다. (O)
    • select 방식은 IO 관련 이벤트의 발생시점과 호출된 함수의 반환시점이 일치하기 때문에 비동기 모델이 아니다. (O)
    • WSAEventSelect 함수는 select 방식의 비동기 모델이라 할 수 있다. IO 관련 이벤트의 발생을 비동기의 형태로 알리기 때문이다. (O)
  4. select 함수를 이용하는 방식과 WSAEventSelect 함수를 이용하는 방식의 차이점을 소스코드의 관점에서 설명해보자.

    select() 함수를 이용할 경우 관찰대상이 되는 소켓의 정보들을 호출시마다 매번 넘겨주어야 하지만, WSAEventSelect() 함수는 운영체제에게 한번만 전달하면 됨
    또한 select() 함수는 동기 입출력 방식이기 때문에 이벤트 발생시까지 블로킹되지만, WSAEventSelect() 함수는 비동기 입출력 방식이기때문에 블로킹되지 않음

  5. Chapter 17에서 소개한 epoll은 엣지 트리거 모드와 레벨 트리거 모드로 동작한다. 그렇다면 이 중에서 비동기 입출력이 잘 어울리는 모드는 무엇인가? 그리고 그 이유는 또 무엇이낙? 이와 관련해서 포괄적인 답을 해보자.

    엣지 트리거 모드는 데이터가 처음 들어올 때만 이벤트를 등록하기 때문에 비동기 입출력에서 문제상황이 발생할 일이 없지만, 레벨 트리거 모드의 경우 매번 새로 이벤트를 등록하기 때문에 처리중인 입출력과 새로 등록된 입출력의 구별이 불가능함

  6. 리눅스의 epoll 역시 비동기 입출력 모델이라 할 수 있다. 그렇다면 이를 비동기 입출력 모델이라 할 수 있는 이유에 대해서 설명해보자.

    epoll 방식은 이벤트의 등록(epoll_ctl())과 이벤트의 발생확인(epoll_wait()) 작업이 나누어져있기 때문에 이벤트 발생 이후 원하는 시점에 이벤트 발생유무를 확인할 수 있음

  7. WSAWaitForMultipleEvents 함수가 관찰할 수 있는 최대 핸들의 수는 어떻게 확인이 가능한가? 이의 확인을 위한 코드를 작성해서, 이 값을 실제로 확인해보자.

    WSA_MAXIMUM_WAIT_EVENTS 매크로 상수를 확인

  8. 비동기 Notification IO 모델에서 Event 오브젝트가 manual-reset 모드여야하는 이유를 설명해보자.

    이벤트 발생유무를 확인하기 위해 WSAWaitForMultipleEvents() 함수를 호출한 후, 이후 실제 이벤트 발생대상을 찾기 위해 추가로 WSAWaitForMultipleEvents() 함수를 호출하는데 이 때 이벤트가 auto-reset 모드일 경우 자동으로 non-signaled 상태로 변경되어 실제 이벤트 발생대상을 찾을 수 없음

  9. 이번 Chapter에서 설명한 비동기 Notification IO 모델을 바탕으로 채팅 서버를 구현해보자. 이 채팅 서버는 Chapter 20에서 소개한 채팅 클라이언트인 예제 chat_clnt_win.cpp와 함께 동작이 가능해야한다.

     // Asynchat_serv_win.cpp
    
     #include <iostream>
     #include <unistd.h>
     #include <windows.h>
     #include <process.h>
    
     using namespace std;
    
     #define BUF_SIZE 100
     #define MAX_CLNT 256
    
    
     void	CompressSockets(SOCKET hSockArr[], int idx, int total);
     void	CompressEvents(WSAEVENT hEventArr[], int idx, int total);
     void	SendMsg(SOCKET clntSocks[], int clntCnt, char *msg, int len);
     void	ErrorHandling(char* message);
    
     int	main(int argc, char *argv[])
     {
         WSADATA		wsaData;
         SOCKET		hServSock, hClntSock;
         SOCKADDR_IN	servAdr, clntAdr;
    
         SOCKET				hSockArr[WSA_MAXIMUM_WAIT_EVENTS];
         WSAEVENT			hEventArr[WSA_MAXIMUM_WAIT_EVENTS];
         WSAEVENT			newEvent;
         WSANETWORKEVENTS	netEvents;
    
         int		numOfClntSock = 0;
         int		strLen, posInfo, startIdx;
         int		clntAdrLen;
         char	msg[BUF_SIZE];
    
         if(argc != 2)
         {
             cout << "Usage : " << argv[0] << " <port>\n";
             exit(1);
         }
    
         if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
             ErrorHandling("WSAStartup() error");
    
         hServSock = socket(PF_INET, SOCK_STREAM, 0);
         memset(&servAdr, 0, sizeof(servAdr));
         servAdr.sin_family = AF_INET;
         servAdr.sin_addr.s_addr = htonl(INADDR_ANY);
         servAdr.sin_port = htons(atoi(argv[1]));
    
         if (::bind(hServSock, (SOCKADDR*)&servAdr, sizeof(servAdr)) == SOCKET_ERROR)
             ErrorHandling("bind() error");
    
         if (::listen(hServSock, 5) == SOCKET_ERROR)
             ErrorHandling("listen() error");
    
         newEvent = WSACreateEvent();
         if (WSAEventSelect(hServSock, newEvent, FD_ACCEPT) == SOCKET_ERROR)
             ErrorHandling("WSAEventSelect() error");
    		
         hSockArr[numOfClntSock] = hServSock;
         hEventArr[numOfClntSock] = newEvent;
         numOfClntSock++;
    
         while (1)
         {
             posInfo = WSAWaitForMultipleEvents(numOfClntSock, hEventArr, FALSE, WSA_INFINITE, FALSE);
             startIdx = posInfo - WSA_WAIT_EVENT_0;
    
             for (int i = startIdx; i < numOfClntSock; i++)
             {
                 int	sigEventIdx = WSAWaitForMultipleEvents(1, &hEventArr[i], TRUE, 0, FALSE);
                 if ((sigEventIdx == WSA_WAIT_FAILED || sigEventIdx == WSA_WAIT_TIMEOUT))
                     continue;
                 else
                 {
                     sigEventIdx = i;
                     WSAEnumNetworkEvents(hSockArr[sigEventIdx], hEventArr[sigEventIdx], &netEvents);
                     if (netEvents.lNetworkEvents & FD_ACCEPT)	// 연결요청시
                     {
                         if (netEvents.iErrorCode[FD_ACCEPT_BIT] != 0)
                         {
                             cout << "Accept Error" << endl;
                             break;
                         }
                         clntAdrLen = sizeof(clntAdr);
                         hClntSock = accept(hSockArr[sigEventIdx], (SOCKADDR*)&clntAdr, &clntAdrLen);
                         newEvent = WSACreateEvent();
                         WSAEventSelect(hClntSock, newEvent, FD_READ | FD_CLOSE);
    
                         hEventArr[numOfClntSock] = newEvent;
                         hSockArr[numOfClntSock] = hClntSock;
                         numOfClntSock++;
    
                         cout << "connected new client...\n";
                     }
    
                     if (netEvents.lNetworkEvents & FD_READ)	// 데이터 수신시
                     {
                         if (netEvents.iErrorCode[FD_READ_BIT] != 0)
                         {
                             cout << "Read Error\n";
                             break;
                         }
                         strLen = recv(hSockArr[sigEventIdx], msg, sizeof(msg), 0);
                         SendMsg(hSockArr, numOfClntSock, msg, strLen);
                     }
    
                     if (netEvents.lNetworkEvents & FD_CLOSE)	// 종료요청시
                     {
                         if (netEvents.iErrorCode[FD_CLOSE_BIT] != 0)
                         {
                             cout << "Read Error\n";
                             break;
                         }
                         WSACloseEvent(hEventArr[sigEventIdx]);
                         closesocket(hSockArr[sigEventIdx]);
    
                         numOfClntSock--;
                         CompressSockets(hSockArr, sigEventIdx, numOfClntSock);
                         CompressEvents(hEventArr, sigEventIdx, numOfClntSock);
                     }
                 }
             }
         }
         closesocket(hServSock);
         WSACleanup();
         return 0;
     }
    
     void	SendMsg(SOCKET clntSocks[], int clntCnt, char *msg, int len)	// send to all
     {
         for (int i = 0; i < clntCnt; i++)
             send(clntSocks[i], msg, len, 0);
     }
    
     void	CompressSockets(SOCKET hSockArr[], int idx, int total)
     {
         for (int i = idx; i < total; i++)
             hSockArr[i] = hSockArr[i + 1];
     }
    
     void	CompressEvents(WSAEVENT hEventArr[], int idx, int total)
     {
         for (int i = idx; i < total; i++)
             hEventArr[i] = hEventArr[i + 1];
     }
    
     void	ErrorHandling(char *message)
     {
         fputs(message, stderr);
         fputc('\n', stderr);
         exit(1);
     }