Chapter 21 - Asynchronous Notification IO 모델
Chapter 21 - Asynchronous Notification IO 모델
21-1 : 비동기(Asynchronous) Notification IO 모델의 이해
send()
와 recv()
함수는 동기(Synchronous)화된 입출력 함수
send()
함수가 호출되는 순간부터 데이터의 전송이 시작되고,send()
함수의 호출이 완료(반환)되는 순간 데이터의 전송이 완료됨recv()
함수가 호출되는 순간부터 데이터의 수신이 시작되고,recv()
함수의 호출이 완료(반환)되는 순간 데이터의 수신이 완료됨- 즉, 입출력 함수의 반환시점과 데이터 송수신의 완료시점이 일치
동기화된 입출력은 입출력이 진행되는 동안 호출된 함수가 반환을 하지 않는다는 단점이 존재함
따라서 보다 효율적으로 CPU를 활용하기 위해서는 비동기 방식을 사용해야함
동기와 비동기 개념은 입출력 뿐만 아니라 Notification IO
에도 존재
Notification IO
: 입력버퍼에 데이터가 수신되어 데이터의 수신이 필요하거나, 출력버퍼가 비어 데이터의 전송이 가능한 상황의 알림- 대표적인
Notification IO
방식에는select
방식이 존재 select()
는 호출된 함수의 반환 시점이 IO가 필요하거나 가능한 상황이 발생한(IO 관련 이벤트가 발생한) 시점과 일치하기 때문에 동기 방식임- 이와달리 IO의 상태에 상관없이 반환이 이루어지면 비동기 방식이며,
WSAEventSelect()
함수가 해당됨 - 비동기
Notification IO
의 경우 IO의 상태변화를 별도로 관찬ㄹ해야함
21-2 : 비동기(Asynchronous) Notification IO 모델의 이해와 구현
비동기 Notification IO
모델 구현에는 WSAEventSelect()
함수를 사용하는 방법과 WSAAsyncSelect()
함수를 사용하는 방법이 존재
단, WSAAsyncSelect()
함수를 사용하기 위해서는 발생한 이벤트를 수신할 윈도우의 핸들을 지정해야함
IO의 상태변화에는 소켓의 상태변화와 소켓의 이벤트 발생이 있음
WSAEventSelect()
함수는 임의의 소켓을 대상으로 이벤트 발생여부의 관찰을 명령할 때 사용
#include <winsock2.h>
int WSAEventSelect(SOCKET s, WSAEVENT hEventObject, long lNetworkEvents);
// 성공시 0, 실패시 SOCKET_ERROR 반환
hEventObject
: 이벤트 발생유무의 확인을 위한Event
오브젝트 핸들 전달lNetworkEvents
: 감시하고자 하는 이벤트의 유형에 대한 정보FD_READ
: 수신할 데이터 존재여부FD_WRITE
: 블로킹 없이 데이터 전송 가능 여부FD_OOB
:Out-of-band
데이터 수신 여부FD_ACCEPT
: 연결요청 여부FD_CLOSE
: 연결종료 요청 여부
WSAEventSelect()
함수는 지정한 이벤트 중 하나가 소켓에서 발생하면 Event
오브젝트를 signaled
상태로 바꿈
- 즉,
Event
오브젝트와 소켓을 연결하는 역할을 함 - 비동기 방식이기 때문에 이베트 발생유무에 상관없이 바로 반환하며, 따라서 즉시 다른 작업을 진행할 수 있음
WSAEventSelect()
함수호출을 통해 전달된 소켓의 정보는 운영체제에 등록되기 때문에 추후 재호출이 불필요함
WSAEventSelect()
에서 사용되는 Event
오브젝트는 manual-reset
모드이면서 non-signaled
상태이기 때문에 CreateEvent()
함수가 아닌 WSACreateEvent()
함수를 사용하여 생성하는 것이 편리
#include <winsock2.h>
WSAEVENT WSACreateEvent(void);
// 성공시 Event 오브젝트 핸들, 실패시 WSA_INVALID_EVENT 반환
// 반환형인 `WSAEVENT`는 내부적으로 `HANDLE`로 정의
Event
오브젝트의 종료에는 WSACloseEvent()
함수를 사용
#include <winsock2.h>
BOOL WSACloseEvent(WSAEVENT hEvent);
// 성공시 TRUE, 실패시 FALSE 반환
이벤트의 발생유무 확인에는 WSAWaitForMultipleEvents()
함수를 사용
#include <winsock2.h>
DWORD WSAWaitForMultipleEvents(DWORD cEvents, const WSAEVENT *lphEvents, BOOL fwaitAll, DWORD dwTimeout, BOOL fAlertable);
// 성공시 이벤트 발생 오브젝트 관련정보, 실패시 WSA_INVALID_EVENT 반환
cEvents
: 확인할Event
오브젝트의 개수 정보lphEvents
:Event
오브젝트 핸들의 배열 주소값fWaitAll
: TRUE일시 모든Event
오브젝트가signaled
일 때, FALSE일시 하나라도signaled
일 때 반환dwTimeout
: 1/1000초 단위로 타임아웃 지정,WSA_INFINITE
일시signaled
상태가 될때까지 반환하지 않음fAlertable
: TRUE일시alertable wait
상태로 진입- 반환된 정수 값 -
WSA_WAIT_EVENT_0
=lphEVents
의 배열 기준으로signaled
상태가 된Event
오브젝트의 핸들이 저장된 인덱스, 둘 이상일시 그 중 작은 인덱스 - 타임아웃시에는
WAIT_TIMEOUT
반환
전달할 수 잇는 최대 Event
오브젝트의 수가 64개임에 유의
Event
오브젝트가 manual-reset
모드이기 때문에 signaled
상태가 된 Event
오브젝트 전부를 아래와같이 확인할 수 있음
int posInfo = WSAWaitForMultipleEvents(numOfSock, hEventArray, FALSE, WSA_INFINITE, FALSE);
int startIdx = posInfo - WSA_WAIT_EVENT_0;
...
for (int i = startIdx; i < numOfSock; i++)
{
int sigEventIdx = WSAWaitForMultipleEvents(1, &hEventArray[i], TRUE, 0, FALSE);
...
}
Event
오브젝트가 signaled
상태가 된 원인을 확인하기 위해서는 WSAEnumNetworkEvents()
함수를 사용
#include <winsock2.h>
int WSAEnumNetworkEvents(SOCKET s, WSAEVENT hEventObject, LPWSANETWORKEVENTS lpNetworkEvents);
// 성공시 0, 실패시 SOCKET_ERROR 반환
typedef struct _WSANETWORKEVENTS
{
long lNetworkEvents;
int iErrorCode[FD_MAX_EVENTS];
} WSANETWORKEVENTS, *LPWSANETWORKEVNETS;
lpNetworkEvents
: 발생한 이벤트의 유형정보 및 오류정보로 채워질 구조체 변수의 주소값 지정WSAEnumNetworkEvents()
함수 호출시Event
오브젝트가non-signaled
상태로 되돌아감- 이벤트
FD_XXX
관련 오류 발생시iErrorCode[FD_XXX_BIT]
에 0 이외의 값이 저장됨
다음과 같은 방법으로 발생한 이벤트의 종류 확인 가능
WSANETWORKEVENTS netEvents;
...
WSAEnumNetworkEvents(hSock, hEvent, &netEvents);
if (netEvents.lNetworkEvents & FD_ACCEPT)
{
// FD_ACCEPT 이벤트 발생에 대한 처리
}
if (netEvents.lNetworkEvents & FD_READ)
{
// FD_READ 이벤트 발생에 대한 처리
}
if (netEvents.lNetworkEvents & FD_CLOSE)
{
// FD_CLOSE 이벤트 발생에 대한 처리
}
비동기 Notification IO
모델 에코 서버 예시
// AsynNotiEchoServ_win.cpp
// 클라이언트는 아무 에코 클라이언트나 사용
#include <iostream>
#include <winsock2.h>
using namespace std;
#define BUF_SIZE 100
void CompressSockets(SOCKET hSockArr[], int idx, int total);
void CompressEvents(WSAEVENT hEventArr[], int idx, int total);
void ErrorHandling(char *msg);
int main(int argc, char *argv[])
{
WSADATA wsaData;
SOCKET hServSock, hClntSock;
SOCKADDR_IN servAdr, clntAdr;
SOCKET hSockArr[WSA_MAXIMUM_WAIT_EVENTS];
WSAEVENT hEventArr[WSA_MAXIMUM_WAIT_EVENTS];
WSAEVENT newEvent;
WSANETWORKEVENTS netEvents;
int numOfClntSock = 0;
int strLen, posInfo, startIdx;
int clntAdrLen;
char msg[BUF_SIZE];
if(argc != 2)
{
cout << "Usage : " << argv[0] << " <port>\n";
exit(1);
}
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
ErrorHandling("WSAStartup() error");
hServSock = socket(PF_INET, SOCK_STREAM, 0);
memset(&servAdr, 0, sizeof(servAdr));
servAdr.sin_family = AF_INET;
servAdr.sin_addr.s_addr = htonl(INADDR_ANY);
servAdr.sin_port = htons(atoi(argv[1]));
if (::bind(hServSock, (SOCKADDR*)&servAdr, sizeof(servAdr)) == SOCKET_ERROR)
ErrorHandling("bind() error");
if (::listen(hServSock, 5) == SOCKET_ERROR)
ErrorHandling("listen() error");
newEvent = WSACreateEvent();
if (WSAEventSelect(hServSock, newEvent, FD_ACCEPT) == SOCKET_ERROR)
ErrorHandling("WSAEventSelect() error");
hSockArr[numOfClntSock] = hServSock;
hEventArr[numOfClntSock] = newEvent;
numOfClntSock++;
while (1)
{
posInfo = WSAWaitForMultipleEvents(numOfClntSock, hEventArr, FALSE, WSA_INFINITE, FALSE);
startIdx = posInfo - WSA_WAIT_EVENT_0;
for (int i = startIdx; i < numOfClntSock; i++)
{
int sigEventIdx = WSAWaitForMultipleEvents(1, &hEventArr[i], TRUE, 0, FALSE);
if ((sigEventIdx == WSA_WAIT_FAILED || sigEventIdx == WSA_WAIT_TIMEOUT))
continue;
else
{
sigEventIdx = i;
WSAEnumNetworkEvents(hSockArr[sigEventIdx], hEventArr[sigEventIdx], &netEvents);
if (netEvents.lNetworkEvents & FD_ACCEPT) // 연결요청시
{
if (netEvents.iErrorCode[FD_ACCEPT_BIT] != 0)
{
cout << "Accept Error" << endl;
break;
}
clntAdrLen = sizeof(clntAdr);
hClntSock = accept(hSockArr[sigEventIdx], (SOCKADDR*)&clntAdr, &clntAdrLen);
newEvent = WSACreateEvent();
WSAEventSelect(hClntSock, newEvent, FD_READ | FD_CLOSE);
hEventArr[numOfClntSock] = newEvent;
hSockArr[numOfClntSock] = hClntSock;
numOfClntSock++;
cout << "connected new client...\n";
}
if (netEvents.lNetworkEvents & FD_READ) // 데이터 수신시
{
if (netEvents.iErrorCode[FD_READ_BIT] != 0)
{
cout << "Read Error\n";
break;
}
strLen = recv(hSockArr[sigEventIdx], msg, sizeof(msg), 0);
send(hSockArr[sigEventIdx], msg, strLen, 0);
}
if (netEvents.lNetworkEvents & FD_CLOSE) // 종료요청시
{
if (netEvents.iErrorCode[FD_CLOSE_BIT] != 0)
{
cout << "Read Error\n";
break;
}
WSACloseEvent(hEventArr[sigEventIdx]);
closesocket(hSockArr[sigEventIdx]);
numOfClntSock--;
CompressSockets(hSockArr, sigEventIdx, numOfClntSock);
CompressEvents(hEventArr, sigEventIdx, numOfClntSock);
}
}
}
}
WSACleanup();
return 0;
}
void CompressSockets(SOCKET hSockArr[], int idx, int total)
{
for (int i = idx; i < total; i++)
hSockArr[i] = hSockArr[i + 1];
}
void CompressEvents(WSAEVENT hEventArr[], int idx, int total)
{
for (int i = idx; i < total; i++)
hEventArr[i] = hEventArr[i + 1];
}
void ErrorHandling(char *msg)
{
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
내용 확인문제
-
동기 입출력과 비동기 입출력이 무엇인지,
send
&recv
함수를 기준으로 설명해보자. 그리고 동기 입출력의 단점은 무엇이고 이것이 비동기 입출력을 통해서 어떻게 해결이 되는지도 함께 설명하자.동기 입출력이란
send()
나recv()
처럼 입출력 함수의 반환 시점과 데이터 송수신 완료 시점이 일치하는 입출력을 뜻하고, 비동기 입출력은 두 시점이 일치하지 않는 입출력을 뜻함
동기 입출력은 입출력 함수가 호출되는 동안 다른 작업이 이루어질 수 없기 때문에 비동기 입출력에서는 입출력 함수를 호출과 동시에 반환시켜 해결 -
모든 경우에 있어서 비동기 입출력이 최선의 선택이 될 수는 없다. 그렇다면 비동기 입출력의 단점은 무엇인가? 그리고 어떠한 경우에 동기 입출력이 좋은 선택이 될 수 있겠는가? 이에 대한 답을 내리기 위해서 비동기 입출력 관련 소스코드를 참고하기 바라며, 쓰레드와 관련해서도 여러분의 의견을 제시하기 바란다.
비동기 입출력은 입출력의 완료 여부를 이후에 확인해야하기 때문에 서비스 형태가 간단하거나 응답에 필요한 데이터 크기가 작은 경우에는 불편할 수 있음
특히 클라이언트 하나당 쓰레드를 하나씩 생성하는 서버 모델에서는 비동기 입출력을 진행할 필요가 없음 select
방식과 관련된 다음 설명이 맞으면 O, 틀리면 X를 표시해보자.select
방식은 호출된 함수의 반환을 통해서 IO 관련 이벤트의 발생을 알리니,Notification IO
모델이라 할 수 있다. (O)select
방식은 IO 관련 이벤트의 발생시점과 호출된 함수의 반환시점이 일치하기 때문에 비동기 모델이 아니다. (O)WSAEventSelect
함수는select
방식의 비동기 모델이라 할 수 있다. IO 관련 이벤트의 발생을 비동기의 형태로 알리기 때문이다. (O)
-
select
함수를 이용하는 방식과WSAEventSelect
함수를 이용하는 방식의 차이점을 소스코드의 관점에서 설명해보자.select()
함수를 이용할 경우 관찰대상이 되는 소켓의 정보들을 호출시마다 매번 넘겨주어야 하지만,WSAEventSelect()
함수는 운영체제에게 한번만 전달하면 됨
또한select()
함수는 동기 입출력 방식이기 때문에 이벤트 발생시까지 블로킹되지만,WSAEventSelect()
함수는 비동기 입출력 방식이기때문에 블로킹되지 않음 -
Chapter 17에서 소개한
epoll
은 엣지 트리거 모드와 레벨 트리거 모드로 동작한다. 그렇다면 이 중에서 비동기 입출력이 잘 어울리는 모드는 무엇인가? 그리고 그 이유는 또 무엇이낙? 이와 관련해서 포괄적인 답을 해보자.엣지 트리거 모드는 데이터가 처음 들어올 때만 이벤트를 등록하기 때문에 비동기 입출력에서 문제상황이 발생할 일이 없지만, 레벨 트리거 모드의 경우 매번 새로 이벤트를 등록하기 때문에 처리중인 입출력과 새로 등록된 입출력의 구별이 불가능함
-
리눅스의
epoll
역시 비동기 입출력 모델이라 할 수 있다. 그렇다면 이를 비동기 입출력 모델이라 할 수 있는 이유에 대해서 설명해보자.epoll
방식은 이벤트의 등록(epoll_ctl()
)과 이벤트의 발생확인(epoll_wait()
) 작업이 나누어져있기 때문에 이벤트 발생 이후 원하는 시점에 이벤트 발생유무를 확인할 수 있음 -
WSAWaitForMultipleEvents
함수가 관찰할 수 있는 최대 핸들의 수는 어떻게 확인이 가능한가? 이의 확인을 위한 코드를 작성해서, 이 값을 실제로 확인해보자.WSA_MAXIMUM_WAIT_EVENTS
매크로 상수를 확인 -
비동기
Notification IO
모델에서Event
오브젝트가manual-reset
모드여야하는 이유를 설명해보자.이벤트 발생유무를 확인하기 위해
WSAWaitForMultipleEvents()
함수를 호출한 후, 이후 실제 이벤트 발생대상을 찾기 위해 추가로WSAWaitForMultipleEvents()
함수를 호출하는데 이 때 이벤트가auto-reset
모드일 경우 자동으로non-signaled
상태로 변경되어 실제 이벤트 발생대상을 찾을 수 없음 -
이번 Chapter에서 설명한 비동기
Notification IO
모델을 바탕으로 채팅 서버를 구현해보자. 이 채팅 서버는 Chapter 20에서 소개한 채팅 클라이언트인 예제chat_clnt_win.cpp
와 함께 동작이 가능해야한다.// Asynchat_serv_win.cpp #include <iostream> #include <unistd.h> #include <windows.h> #include <process.h> using namespace std; #define BUF_SIZE 100 #define MAX_CLNT 256 void CompressSockets(SOCKET hSockArr[], int idx, int total); void CompressEvents(WSAEVENT hEventArr[], int idx, int total); void SendMsg(SOCKET clntSocks[], int clntCnt, char *msg, int len); void ErrorHandling(char* message); int main(int argc, char *argv[]) { WSADATA wsaData; SOCKET hServSock, hClntSock; SOCKADDR_IN servAdr, clntAdr; SOCKET hSockArr[WSA_MAXIMUM_WAIT_EVENTS]; WSAEVENT hEventArr[WSA_MAXIMUM_WAIT_EVENTS]; WSAEVENT newEvent; WSANETWORKEVENTS netEvents; int numOfClntSock = 0; int strLen, posInfo, startIdx; int clntAdrLen; char msg[BUF_SIZE]; if(argc != 2) { cout << "Usage : " << argv[0] << " <port>\n"; exit(1); } if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) ErrorHandling("WSAStartup() error"); hServSock = socket(PF_INET, SOCK_STREAM, 0); memset(&servAdr, 0, sizeof(servAdr)); servAdr.sin_family = AF_INET; servAdr.sin_addr.s_addr = htonl(INADDR_ANY); servAdr.sin_port = htons(atoi(argv[1])); if (::bind(hServSock, (SOCKADDR*)&servAdr, sizeof(servAdr)) == SOCKET_ERROR) ErrorHandling("bind() error"); if (::listen(hServSock, 5) == SOCKET_ERROR) ErrorHandling("listen() error"); newEvent = WSACreateEvent(); if (WSAEventSelect(hServSock, newEvent, FD_ACCEPT) == SOCKET_ERROR) ErrorHandling("WSAEventSelect() error"); hSockArr[numOfClntSock] = hServSock; hEventArr[numOfClntSock] = newEvent; numOfClntSock++; while (1) { posInfo = WSAWaitForMultipleEvents(numOfClntSock, hEventArr, FALSE, WSA_INFINITE, FALSE); startIdx = posInfo - WSA_WAIT_EVENT_0; for (int i = startIdx; i < numOfClntSock; i++) { int sigEventIdx = WSAWaitForMultipleEvents(1, &hEventArr[i], TRUE, 0, FALSE); if ((sigEventIdx == WSA_WAIT_FAILED || sigEventIdx == WSA_WAIT_TIMEOUT)) continue; else { sigEventIdx = i; WSAEnumNetworkEvents(hSockArr[sigEventIdx], hEventArr[sigEventIdx], &netEvents); if (netEvents.lNetworkEvents & FD_ACCEPT) // 연결요청시 { if (netEvents.iErrorCode[FD_ACCEPT_BIT] != 0) { cout << "Accept Error" << endl; break; } clntAdrLen = sizeof(clntAdr); hClntSock = accept(hSockArr[sigEventIdx], (SOCKADDR*)&clntAdr, &clntAdrLen); newEvent = WSACreateEvent(); WSAEventSelect(hClntSock, newEvent, FD_READ | FD_CLOSE); hEventArr[numOfClntSock] = newEvent; hSockArr[numOfClntSock] = hClntSock; numOfClntSock++; cout << "connected new client...\n"; } if (netEvents.lNetworkEvents & FD_READ) // 데이터 수신시 { if (netEvents.iErrorCode[FD_READ_BIT] != 0) { cout << "Read Error\n"; break; } strLen = recv(hSockArr[sigEventIdx], msg, sizeof(msg), 0); SendMsg(hSockArr, numOfClntSock, msg, strLen); } if (netEvents.lNetworkEvents & FD_CLOSE) // 종료요청시 { if (netEvents.iErrorCode[FD_CLOSE_BIT] != 0) { cout << "Read Error\n"; break; } WSACloseEvent(hEventArr[sigEventIdx]); closesocket(hSockArr[sigEventIdx]); numOfClntSock--; CompressSockets(hSockArr, sigEventIdx, numOfClntSock); CompressEvents(hEventArr, sigEventIdx, numOfClntSock); } } } } closesocket(hServSock); WSACleanup(); return 0; } void SendMsg(SOCKET clntSocks[], int clntCnt, char *msg, int len) // send to all { for (int i = 0; i < clntCnt; i++) send(clntSocks[i], msg, len, 0); } void CompressSockets(SOCKET hSockArr[], int idx, int total) { for (int i = idx; i < total; i++) hSockArr[i] = hSockArr[i + 1]; } void CompressEvents(WSAEVENT hEventArr[], int idx, int total) { for (int i = idx; i < total; i++) hEventArr[i] = hEventArr[i + 1]; } void ErrorHandling(char *message) { fputs(message, stderr); fputc('\n', stderr); exit(1); }