Network/Windows 소켓 프로그래밍 입문에서 고성능 서버까지!

서버 성능 개선

Tony Lim 2023. 11. 28. 09:33

IOCP 모델

  • Proactor 방식 고속 입/출력 모델
    • 입출력이 끝날 때까지 대기를 계속하는것이 아니라 callback 처럼 입출력이 끝날시에 어떤것을 해줘! 하는 방식
  • 비동기 I/O 통지(Callback) 구조
  • 사용자 요청에 대한 처리 스레드 풀을 OS가 직접관리
    • 사용자가 스레드풀을 만들어서 처리하는것이 아님
  • 커널영역에서 사용자 메모리 영역을 공유해 불필요한 메모리 복사 방지
  • 입/ 출력 처리시 관련 메모리에 대해 페이지 단위 Lock/Unlock
  • Callback 함수는 사용자 모드 함수이나 커널에서 호출하며 이때마다 스위칭

 

 VMS의 특정 메모리에 쓸 data를 쓰고 I/O request가 IOCP queue에 들어가게 되면

kernel io작업을 할때 특정 메모리를 Lock을 잡아서 오염되지 않게 하고 io처리를 하고 나면 Unlock해준다.

NIC 이 scatterd 를 지원하면 DMA의 경우 바로 특정 메모리에 써주기때문에 불필요한 kernel -> app buffer copy가 일어나지 않는다.

 

CreateIOCP 를 통해 IOCP Queue등 IOCP가 동작할 수 있는 환경이 생성된다.

IO할 스레드들도 미리 생성이 된다.

client socket이 accept에 의해 생성이 될때마다 IOCP가 감시하게 된다.

WSARecv() 는 ReadFileEx와 같이 I/O Pending 이 될것이다. OS가 data처리를 다 하기 전까지

GQCS() = GetQueueCompletionStatus , IOCP queue가 request를 다 처리하기 전까지 Wait하는 상태에 빠져있다.


IOCP 기반 채팅 서버

typedef struct _USERSESSION
{
	SOCKET	hSocket;
	char	buffer[8192];	//8KB
} USERSESSION;

/////////////////////////////////////////////////////////////////////////
//클라이언트 처리를 위한 작업자 스레드 개수.
#define MAX_THREAD_CNT		4

CRITICAL_SECTION  g_cs;				//스레드 동기화 객체
std::list<SOCKET>	g_listClient;	//연결된 클라이언트 소켓 리스트.
SOCKET	g_hSocket;					//서버의 리슨 소켓
HANDLE	g_hIocp;

client socket을 USERSESSION 으로 랩핑하였다. 

작업자 스레드 개수는 GQCS를 호출하는 client의request를 처리할 스레드 갯수를 의미하고 cpu 코어 갯수에 따라 조절을 할 필요가 있다.

	//IOCP 생성
	g_hIocp = ::CreateIoCompletionPort(
		INVALID_HANDLE_VALUE,	//연결된 파일 없음.
		NULL,			//기존 핸들 없음.
		0,				//식별자(Key) 해당되지 않음.
		0);				//스레드 개수는 OS에 맡김.
	if (g_hIocp == NULL)
	{
		puts("ERORR: IOCP를 생성할 수 없습니다.");
		return 0;
	}

IOCP Queue 및 환경 관리쳬게가 생성이 된다.

	//IOCP 스레드들 생성
	HANDLE hThread;
	DWORD dwThreadID;
	for (int i = 0; i < MAX_THREAD_CNT; ++i)
	{
		dwThreadID = 0;
		//클라이언트로부터 문자열을 수신함.
		hThread = ::CreateThread(NULL,	//보안속성 상속
			0,				//스택 메모리는 기본크기(1MB)
			ThreadComplete,	//스래드로 실행할 함수이름
			(LPVOID)NULL,	//
			0,				//생성 플래그는 기본값 사용
			&dwThreadID);	//생성된 스레드ID가 저장될 변수주소

		::CloseHandle(hThread);
	}

client Request를 처리할 스레드 생성

	//서버 리슨 소켓 생성
	g_hSocket = ::WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP,
						NULL, 0, WSA_FLAG_OVERLAPPED);

	//bind()/listen()
	SOCKADDR_IN addrsvr;
	addrsvr.sin_family = AF_INET;
	addrsvr.sin_addr.S_un.S_addr = ::htonl(INADDR_ANY);
	addrsvr.sin_port = ::htons(25000);

기존 Socket함수랑 다르고 Overlapped를 지원하게 된다.

DWORD WINAPI ThreadAcceptLoop(LPVOID pParam)
{
	LPWSAOVERLAPPED	pWol = NULL;
	DWORD			dwReceiveSize, dwFlag;
	USERSESSION		*pNewUser;
	int				nAddrSize = sizeof(SOCKADDR);
	WSABUF			wsaBuf;
	SOCKADDR		ClientAddr;
	SOCKET			hClient;
	int				nRecvResult = 0;

	while ((hClient = ::accept(g_hSocket,
						&ClientAddr, &nAddrSize)) != INVALID_SOCKET)
	{
		puts("새 클라이언트가 연결됐습니다.");
		::EnterCriticalSection(&g_cs);
		g_listClient.push_back(hClient);
		::LeaveCriticalSection(&g_cs);

		//새 클라이언트에 대한 세션 객체 생성
		pNewUser = new USERSESSION;
		::ZeroMemory(pNewUser, sizeof(USERSESSION));
		pNewUser->hSocket = hClient;

		//비동기 수신 처리를 위한 OVERLAPPED 구조체 생성.
		pWol = new WSAOVERLAPPED;
		::ZeroMemory(pWol, sizeof(WSAOVERLAPPED));

		//(연결된) 클라이언트 소켓 핸들을 IOCP에 연결.
		::CreateIoCompletionPort( (HANDLE)hClient, g_hIocp,
			(ULONG_PTR)pNewUser,		//KEY!!!
			0);

		dwReceiveSize = 0;
		dwFlag = 0;
		wsaBuf.buf = pNewUser->buffer;
		wsaBuf.len = sizeof(pNewUser->buffer);

		//클라이언트가 보낸 정보를 비동기 수신한다.
		nRecvResult = ::WSARecv(hClient, &wsaBuf, 1, &dwReceiveSize,
							&dwFlag, pWol, NULL);
		if (::WSAGetLastError() != WSA_IO_PENDING)
			puts("ERROR: WSARecv() != WSA_IO_PENDING");
	}

	return 0;
}

CreateIoCompletionPort = iocp Queue에 hclient를 감시해달라 요청하는 역할이다.

pNewUser 가 key역할을 한다. malloc (new USERSESSION) 을 통해 새로운 주소를 KEY로 쓰는것이다.
이런방식으로 key를 많이 쓴다

wsaBuf를 WSARecv 를 통해 socket을 통해 들어올 데이터를 저장해야할 장소를 지정해준다.
마지막 인자가 Null인데 보통 완료시에 호출할 callback을 넣어주지만 이 경우에는 IOCP라는 구조에 의해서 호출이 되게된다.

DWORD WINAPI ThreadComplete(LPVOID pParam)
{
	DWORD			dwTransferredSize = 0;
	DWORD			dwFlag = 0;
	USERSESSION		*pSession = NULL;
	LPWSAOVERLAPPED	pWol = NULL;
	BOOL			bResult;

	puts("[IOCP 작업자 스레드 시작]");
	while (1)
	{
		bResult = ::GetQueuedCompletionStatus(
			g_hIocp,				//Dequeue할 IOCP 핸들.
			&dwTransferredSize,		//수신한 데이터 크기.
			(PULONG_PTR)&pSession,	//수신된 데이터가 저장된 메모리
			&pWol,					//OVERLAPPED 구조체.
			INFINITE);				//이벤트를 무한정 대기.

		if (bResult == TRUE)
		{
			//정상적인 경우.

			/////////////////////////////////////////////////////////////
			//1. 클라이언트가 소켓을 정상적으로 닫고 연결을 끊은 경우.
			if (dwTransferredSize == 0)
			{
				
				CloseClient(pSession->hSocket);
				delete pWol;
				delete pSession;
				puts("\tGQCS: 클라이언트가 정상적으로 연결을 종료함.");
			}

			/////////////////////////////////////////////////////////////
			//2. 클라이언트가 보낸 데이터를 수신한 경우.
			else
			{
				SendMessageAll(pSession->buffer, dwTransferredSize);
				memset(pSession->buffer, 0, sizeof(pSession->buffer));

				//다시 IOCP에 등록.
				DWORD dwReceiveSize	= 0;
				DWORD dwFlag		= 0;
				WSABUF wsaBuf		= { 0 };
				wsaBuf.buf = pSession->buffer;
				wsaBuf.len = sizeof(pSession->buffer);

				::WSARecv(
					pSession->hSocket,	//클라이언트 소켓 핸들
					&wsaBuf,			//WSABUF 구조체 배열의 주소
					1,					//배열 요소의 개수
					&dwReceiveSize,
					&dwFlag,
					pWol,
					NULL);
				if (::WSAGetLastError() != WSA_IO_PENDING)
					puts("\tGQCS: ERROR: WSARecv()");
			}
		}
		else
		{
			//비정상적인 경우.

			/////////////////////////////////////////////////////////////
			//3. 완료 큐에서 완료 패킷을 꺼내지 못하고 반환한 경우.
			if (pWol == NULL)
			{
				//IOCP 핸들이 닫힌 경우(서버를 종료하는 경우)도 해당된다.
				puts("\tGQCS: IOCP 핸들이 닫혔습니다.");
				break;
			}

			/////////////////////////////////////////////////////////////
			//4. 클라이언트가 비정상적으로 종료됐거나
			//   서버가 먼저 연결을 종료한 경우.
			else
			{
				if (pSession != NULL)
				{
					CloseClient(pSession->hSocket);
					delete pWol;
					delete pSession;
				}

				puts("\tGQCS: 서버 종료 혹은 비정상적 연결 종료");
			}
		}
	}

	puts("[IOCP 작업자 스레드 종료]");
	return 0;
}

worker thread가 호출할 함수이다.

GQCS 를 call하면서 alterable wait으로 빠지게 된다.

위에서 key로 New Userssion으로 만든 memory주소 값을 통해서 여기서는 session 포인터로 참조하여 사용자를 구별 할 수 있게 된다.

client의 data를 수신한후에 다시 동일하게 IOCP에게 client socket을 등록하는 과정이 존재한다.