IOCP 모델
- Proactor 방식 고속 입/출력 모델
- 입출력이 끝날 때까지 대기를 계속하는것이 아니라 callback 처럼 입출력이 끝날시에 어떤것을 해줘! 하는 방식
- 비동기 I/O 통지(Callback) 구조
- 사용자 요청에 대한 처리 스레드 풀을 OS가 직접관리
- 사용자가 스레드풀을 만들어서 처리하는것이 아님
- 커널영역에서 사용자 메모리 영역을 공유해 불필요한 메모리 복사 방지
- 입/ 출력 처리시 관련 메모리에 대해 페이지 단위 Lock/Unlock
- Callback 함수는 사용자 모드 함수이나 커널에서 호출하며 이때마다 스위칭
VMS의 특정 메모리에 쓸 data를 쓰고 I/O request가 IOCP queue에 들어가게 되면
kernel io작업을 할때 특정 메모리를 Lock을 잡아서 오염되지 않게 하고 io처리를 하고 나면 Unlock해준다.
NIC 이 scatterd 를 지원하면 DMA의 경우 바로 특정 메모리에 써주기때문에 불필요한 kernel -> app buffer copy가 일어나지 않는다.
CreateIOCP 를 통해 IOCP Queue등 IOCP가 동작할 수 있는 환경이 생성된다.
IO할 스레드들도 미리 생성이 된다.
client socket이 accept에 의해 생성이 될때마다 IOCP가 감시하게 된다.
WSARecv() 는 ReadFileEx와 같이 I/O Pending 이 될것이다. OS가 data처리를 다 하기 전까지
GQCS() = GetQueueCompletionStatus , IOCP queue가 request를 다 처리하기 전까지 Wait하는 상태에 빠져있다.
IOCP 기반 채팅 서버
typedef struct _USERSESSION
{
SOCKET hSocket;
char buffer[8192]; //8KB
} USERSESSION;
/////////////////////////////////////////////////////////////////////////
//클라이언트 처리를 위한 작업자 스레드 개수.
#define MAX_THREAD_CNT 4
CRITICAL_SECTION g_cs; //스레드 동기화 객체
std::list<SOCKET> g_listClient; //연결된 클라이언트 소켓 리스트.
SOCKET g_hSocket; //서버의 리슨 소켓
HANDLE g_hIocp;
client socket을 USERSESSION 으로 랩핑하였다.
작업자 스레드 개수는 GQCS를 호출하는 client의request를 처리할 스레드 갯수를 의미하고 cpu 코어 갯수에 따라 조절을 할 필요가 있다.
//IOCP 생성
g_hIocp = ::CreateIoCompletionPort(
INVALID_HANDLE_VALUE, //연결된 파일 없음.
NULL, //기존 핸들 없음.
0, //식별자(Key) 해당되지 않음.
0); //스레드 개수는 OS에 맡김.
if (g_hIocp == NULL)
{
puts("ERORR: IOCP를 생성할 수 없습니다.");
return 0;
}
IOCP Queue 및 환경 관리쳬게가 생성이 된다.
//IOCP 스레드들 생성
HANDLE hThread;
DWORD dwThreadID;
for (int i = 0; i < MAX_THREAD_CNT; ++i)
{
dwThreadID = 0;
//클라이언트로부터 문자열을 수신함.
hThread = ::CreateThread(NULL, //보안속성 상속
0, //스택 메모리는 기본크기(1MB)
ThreadComplete, //스래드로 실행할 함수이름
(LPVOID)NULL, //
0, //생성 플래그는 기본값 사용
&dwThreadID); //생성된 스레드ID가 저장될 변수주소
::CloseHandle(hThread);
}
client Request를 처리할 스레드 생성
//서버 리슨 소켓 생성
g_hSocket = ::WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP,
NULL, 0, WSA_FLAG_OVERLAPPED);
//bind()/listen()
SOCKADDR_IN addrsvr;
addrsvr.sin_family = AF_INET;
addrsvr.sin_addr.S_un.S_addr = ::htonl(INADDR_ANY);
addrsvr.sin_port = ::htons(25000);
기존 Socket함수랑 다르고 Overlapped를 지원하게 된다.
DWORD WINAPI ThreadAcceptLoop(LPVOID pParam)
{
LPWSAOVERLAPPED pWol = NULL;
DWORD dwReceiveSize, dwFlag;
USERSESSION *pNewUser;
int nAddrSize = sizeof(SOCKADDR);
WSABUF wsaBuf;
SOCKADDR ClientAddr;
SOCKET hClient;
int nRecvResult = 0;
while ((hClient = ::accept(g_hSocket,
&ClientAddr, &nAddrSize)) != INVALID_SOCKET)
{
puts("새 클라이언트가 연결됐습니다.");
::EnterCriticalSection(&g_cs);
g_listClient.push_back(hClient);
::LeaveCriticalSection(&g_cs);
//새 클라이언트에 대한 세션 객체 생성
pNewUser = new USERSESSION;
::ZeroMemory(pNewUser, sizeof(USERSESSION));
pNewUser->hSocket = hClient;
//비동기 수신 처리를 위한 OVERLAPPED 구조체 생성.
pWol = new WSAOVERLAPPED;
::ZeroMemory(pWol, sizeof(WSAOVERLAPPED));
//(연결된) 클라이언트 소켓 핸들을 IOCP에 연결.
::CreateIoCompletionPort( (HANDLE)hClient, g_hIocp,
(ULONG_PTR)pNewUser, //KEY!!!
0);
dwReceiveSize = 0;
dwFlag = 0;
wsaBuf.buf = pNewUser->buffer;
wsaBuf.len = sizeof(pNewUser->buffer);
//클라이언트가 보낸 정보를 비동기 수신한다.
nRecvResult = ::WSARecv(hClient, &wsaBuf, 1, &dwReceiveSize,
&dwFlag, pWol, NULL);
if (::WSAGetLastError() != WSA_IO_PENDING)
puts("ERROR: WSARecv() != WSA_IO_PENDING");
}
return 0;
}
CreateIoCompletionPort = iocp Queue에 hclient를 감시해달라 요청하는 역할이다.
pNewUser 가 key역할을 한다. malloc (new USERSESSION) 을 통해 새로운 주소를 KEY로 쓰는것이다.
이런방식으로 key를 많이 쓴다
wsaBuf를 WSARecv 를 통해 socket을 통해 들어올 데이터를 저장해야할 장소를 지정해준다.
마지막 인자가 Null인데 보통 완료시에 호출할 callback을 넣어주지만 이 경우에는 IOCP라는 구조에 의해서 호출이 되게된다.
DWORD WINAPI ThreadComplete(LPVOID pParam)
{
DWORD dwTransferredSize = 0;
DWORD dwFlag = 0;
USERSESSION *pSession = NULL;
LPWSAOVERLAPPED pWol = NULL;
BOOL bResult;
puts("[IOCP 작업자 스레드 시작]");
while (1)
{
bResult = ::GetQueuedCompletionStatus(
g_hIocp, //Dequeue할 IOCP 핸들.
&dwTransferredSize, //수신한 데이터 크기.
(PULONG_PTR)&pSession, //수신된 데이터가 저장된 메모리
&pWol, //OVERLAPPED 구조체.
INFINITE); //이벤트를 무한정 대기.
if (bResult == TRUE)
{
//정상적인 경우.
/////////////////////////////////////////////////////////////
//1. 클라이언트가 소켓을 정상적으로 닫고 연결을 끊은 경우.
if (dwTransferredSize == 0)
{
CloseClient(pSession->hSocket);
delete pWol;
delete pSession;
puts("\tGQCS: 클라이언트가 정상적으로 연결을 종료함.");
}
/////////////////////////////////////////////////////////////
//2. 클라이언트가 보낸 데이터를 수신한 경우.
else
{
SendMessageAll(pSession->buffer, dwTransferredSize);
memset(pSession->buffer, 0, sizeof(pSession->buffer));
//다시 IOCP에 등록.
DWORD dwReceiveSize = 0;
DWORD dwFlag = 0;
WSABUF wsaBuf = { 0 };
wsaBuf.buf = pSession->buffer;
wsaBuf.len = sizeof(pSession->buffer);
::WSARecv(
pSession->hSocket, //클라이언트 소켓 핸들
&wsaBuf, //WSABUF 구조체 배열의 주소
1, //배열 요소의 개수
&dwReceiveSize,
&dwFlag,
pWol,
NULL);
if (::WSAGetLastError() != WSA_IO_PENDING)
puts("\tGQCS: ERROR: WSARecv()");
}
}
else
{
//비정상적인 경우.
/////////////////////////////////////////////////////////////
//3. 완료 큐에서 완료 패킷을 꺼내지 못하고 반환한 경우.
if (pWol == NULL)
{
//IOCP 핸들이 닫힌 경우(서버를 종료하는 경우)도 해당된다.
puts("\tGQCS: IOCP 핸들이 닫혔습니다.");
break;
}
/////////////////////////////////////////////////////////////
//4. 클라이언트가 비정상적으로 종료됐거나
// 서버가 먼저 연결을 종료한 경우.
else
{
if (pSession != NULL)
{
CloseClient(pSession->hSocket);
delete pWol;
delete pSession;
}
puts("\tGQCS: 서버 종료 혹은 비정상적 연결 종료");
}
}
}
puts("[IOCP 작업자 스레드 종료]");
return 0;
}
worker thread가 호출할 함수이다.
GQCS 를 call하면서 alterable wait으로 빠지게 된다.
위에서 key로 New Userssion으로 만든 memory주소 값을 통해서 여기서는 session 포인터로 참조하여 사용자를 구별 할 수 있게 된다.
client의 data를 수신한후에 다시 동일하게 IOCP에게 client socket을 등록하는 과정이 존재한다.
'Network > Windows 소켓 프로그래밍 입문에서 고성능 서버까지!' 카테고리의 다른 글
UDP와 브로드캐스트 (0) | 2023.11.30 |
---|---|
파일 송/수신과 프로토콜 설계 (0) | 2023.11.27 |
파일 송/수신 (0) | 2023.11.23 |
TCP 채팅 서버 - 성능 개선 (0) | 2023.11.22 |
TCP 채팅 서버 - 기본 이론 (0) | 2023.11.20 |