
Опытный
 
Профиль
Группа: Участник
Сообщений: 427
Регистрация: 13.6.2007
Где: Молдова, Кишинев
Репутация: нет Всего: 4
|
Доброе время суток. Работаю на "Borland C++ Builder 2009". # Задача: Необходимо написать серверное многопоточное приложение для игрового сервера MMORPG, которое должно поддерживать 1000 подключений одновременно. - Задачу решаю через порты завершения (I/O Сompletion Ports) # Проблема: Мне удаётся в моём примере считывать данные с функции WSARecv() лишь один раз, все последующие возвращают значение -1. - WSAGetLastError() возвращает код 997 (WSA_IO_PENDING) # .cpp:Код | #include <vcl.h> #include "TSocket.h" //--------------------------------------------------------------------------- char *g_Caption = "[Message] CP Server"; //--------------------------------------------------------------------------- vector<LPClientContext> g_list; unsigned long g_CID = 0; // ---- AnsiString g_FileDir = "", g_LogFileName = ""; //--------------------------------------------------------------------------- CRITICAL_SECTION g_cs; //--------------------------------------------------------------------------- #define g_WaitForSingleObjectValueListenThread 100 #define g_WaitForMultipleEventsValueListenThread 100 //--------------------------------------------------------------------------- #define RECV_POSTED 0 #define SEND_POSTED 1 #define ACCEPT_POSTED 2 #define DISCONNECT_POSTED 3 //---------------------------------------------------------------------------
TSocket::TSocket() { InitializeCriticalSection(&g_cs); // ---- g_list.reserve(1000); // ---- g_FileDir = ExtractFilePath(Application->ExeName); g_LogFileName = g_FileDir + "Logs\\MyLog.log"; // ---- //GetSystemInfo(&g_SystemInfo); //g_CompletionPort = CreateNewCompletionPort(g_SystemInfo.dwNumberOfProcessors * 2); CreateIOPortAndThreads(g_CompletionPort, g_SystemInfo); } //---------------------------------------------------------------------------
TSocket::~TSocket() { DisconnectAll(); // ---- RemoveIOPortAndThreads(g_CompletionPort, g_SystemInfo); // ---- DeleteCriticalSection(&g_cs); // ---- // Clean up and exit. WSACleanup(); } //---------------------------------------------------------------------------
HANDLE CreateNewCompletionPort(DWORD dwNumberOfConcurrentThreads) { return CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, dwNumberOfConcurrentThreads); } //---------------------------------------------------------------------------
bool AssociateSocketWithCompletionPort(SOCKET socket, HANDLE hCompletionPort, DWORD dwCompletionKey) { HANDLE h = CreateIoCompletionPort((HANDLE)socket, hCompletionPort, dwCompletionKey, 0); return (h == hCompletionPort); } //---------------------------------------------------------------------------
bool __fastcall TSocket::CreateIOPortAndThreads(HANDLE &CompletionPort, SYSTEM_INFO &SystemInfo) { bool Result = false; // Шаг 1: Создать порт завершения Ввода - вывода; CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if(CompletionPort == NULL) return Result; // Шаг 2: Определяем, сколько процессоров находятся в системе; GetSystemInfo(&SystemInfo); // Шаг 3: Создаем рабочие потоки, основанные на числе процессоров, доступных в системе. // Для нашего случая мы создаем один рабочий поток для каждого процессора. unsigned long maxThr = SystemInfo.dwNumberOfProcessors * 2; // ---- for(unsigned long i = 0; i < maxThr; i++) { HANDLE ThreadHandle; DWORD ThreadId; // Создаем рабочий поток сервера, и передаем порт завершения в поток. // Замечание: вариант содержания функции ServerWorkerThread() определен ниже. ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread, (void*)this, 0, &ThreadId); // Закрываем дескриптор потока; CloseHandle(ThreadHandle); } // ---- Result = true; // ---- return Result; } //---------------------------------------------------------------------------
bool __fastcall TSocket::RemoveIOPortAndThreads(HANDLE &CompletionPort, SYSTEM_INFO &SystemInfo) { bool Result = false; int nRet = 0; // ---- // # Завершение всех рабочих потоков порта завершения; nRet = PostQueuedCompletionStatus(CompletionPort, 0, 0, NULL); if(nRet == 0) return Result; // # Закрытие порта завырешния; nRet = CloseHandle(CompletionPort); if(nRet == 0) return Result; // ---- Result = true; // ---- return Result; } //---------------------------------------------------------------------------
void __fastcall TSocket::AddNewElement(SOCKET sSocket) { ClientContext pContext; memset(&pContext, 0, sizeof(ClientContext)); // ---- pContext.sListen = sSocket; // ---- EnterCriticalSection(&g_cs); //try { g_list.push_back(pContext); } catch(...) {} LeaveCriticalSection(&g_cs); } //---------------------------------------------------------------------------
bool __fastcall TSocket::CreateWS_TCP(unsigned short cPort) { bool Result = false; // ---- int nRes = 0; int ErrorCode = 0; String sText = ""; WSADATA wsaData; UINT threadID; // ---- nRes = WSAStartup(MAKEWORD(2, 2), &wsaData); if(nRes != 0) { ErrorCode = WSAGetLastError(); sText = "WSAStartup() failed with error: " + (String)ErrorCode; SMBA(g_Caption, sText); return Result; } // ---- g_sListen_TCP = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); if(g_sListen_TCP == INVALID_SOCKET) { ErrorCode = WSAGetLastError(); sText = "WSASocket() failed with error: " + (String)ErrorCode; SMBA(g_Caption, sText); return Result; } // ---- g_hEvent = WSA_INVALID_EVENT; g_hEvent = WSACreateEvent(); if (g_hEvent == WSA_INVALID_EVENT) { ErrorCode = WSAGetLastError(); closesocket(g_sListen_TCP); sText = "WSACreateEvent() failed with error: " + (String)ErrorCode; SMBA(g_Caption, sText); return Result; } // ---- nRes = WSAEventSelect(g_sListen_TCP, g_hEvent, FD_ACCEPT); if (nRes == SOCKET_ERROR) { ErrorCode = WSAGetLastError(); closesocket(g_sListen_TCP); sText = "WSAEventSelect() failed with error: " + (String)ErrorCode; SMBA(g_Caption, sText); return Result; } // ---- sockaddr_in a; a.sin_family = AF_INET; a.sin_port = htons(cPort); a.sin_addr.S_un.S_addr = htonl(INADDR_ANY); // ---- nRes = bind(g_sListen_TCP, (sockaddr*)&a, sizeof(sockaddr)); if(nRes == SOCKET_ERROR) { ErrorCode = WSAGetLastError(); closesocket(g_sListen_TCP); sText = "bind() to the port '" + (String)cPort + "' failed with error: " + (String)ErrorCode; SMBA(g_Caption, sText); return Result; } // ---- nRes = listen(g_sListen_TCP, 5); if(nRes == SOCKET_ERROR) { ErrorCode = WSAGetLastError(); closesocket(g_sListen_TCP); sText = "listen() failed with error: " + (String)ErrorCode; SMBA(g_Caption, sText); return Result; } // ----- g_hThread = (HANDLE)_beginthreadex(NULL, // Security 0, // Stack size - use default; ListenThreadProc, // Thread fn entry point; (void*) this, 0, // Init flag; &threadID); // Thread address; // ---- Result = true; // ---- return Result; } //---------------------------------------------------------------------------
DWORD WINAPI ServerWorkerThread(LPVOID thisContext) { TSocket* pThis = reinterpret_cast<TSocket*>(thisContext); // ---- HANDLE CompletionPort = pThis->g_CompletionPort; // ---- DWORD BytesTransferred = 0; DWORD Flags = 0; BOOL bResult; // ---- int itype = 0; // ---- //LPClientContext lpClientContext; LPClientContext lpClientContext; LPWSAOVERLAPPED lpWSAOverlapped; // ---- while(TRUE) { lpClientContext = NULL; // ---- // Ожидаем завершения операции ввода/вывода на любом сокете, // связанным с данным портом завершения; bResult = GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (LPDWORD)&lpClientContext, &lpWSAOverlapped, INFINITE); // ---- if(!bResult || lpClientContext == NULL || lpWSAOverlapped == NULL) { break; //continue; } // ---- // Если произошла ошибка типа BytesTransferred=0, что свидетельствует о // закрытии сокета на удаленном хосте, закрываем свой сокет и очищаем данные, // связанные с сокетом; if (BytesTransferred <= 0) { TCloseSocket(lpClientContext->sListen, ""); //closesocket(lpClientContext->sListen); GlobalFree(lpClientContext); GlobalFree(lpWSAOverlapped); continue; // Продолжаем цикл; } // ---- // Обслуживаем завершенный запрос. Какая операция была закончена, определяем по // содержимому поля OperationTypefield в структуре PerIoData; switch(lpClientContext->OperationType) { case SEND_POSTED: { // Настройка данных операции // для следующего запроса перекрытого ввода-вывода //ZeroMemory(&lpClientContext->WSASendOverlapped, sizeof(WSAOVERLAPPED)); lpClientContext->OperationType = RECV_POSTED; // ---- lpClientContext->iSendRes = WSASend(lpClientContext->sListen, &lpClientContext->wsaOutBuf, 1, &lpClientContext->dwSendBytes, 0, &lpClientContext->WSASendOverlapped, NULL); } break; case RECV_POSTED: { if(lpClientContext->dwRecvBytes > 0) { SendDisassemblePacket(lpClientContext, lpClientContext->dwRecvBytes); ZeroMemory(lpClientContext->InBuf, lpClientContext->dwRecvBytes); lpClientContext->dwRecvBytes = 0; } // Обработка принятых данных // в буфере PerIoData->Buffer // Отправка нового запроса ввода-вывода WSASend или WSARecv. // В качестве примера отправим еще один асинхронный запрос WSARecvO Flags = 0; // Настройка данных операции // для следующего запроса перекрытого ввода-вывода ZeroMemory(&lpClientContext->WSARecvOverlapped, sizeof(WSAOVERLAPPED)); lpClientContext->WSARecvOverlapped.hEvent = lpClientContext->hReadEvent; // ----- lpClientContext->wsaInBuf.len = 10240; lpClientContext->wsaInBuf.buf = lpClientContext->InBuf; lpClientContext->OperationType = RECV_POSTED; // Выполняем вызов WSARecv() и переходим опять к ожиданию завершения; lpClientContext->iRecvRes = WSARecv(lpClientContext->sListen, &lpClientContext->wsaInBuf, 1, &lpClientContext->dwRecvBytes, &Flags, &lpClientContext->WSARecvOverlapped, NULL); // ---- if(lpClientContext->iRecvRes == SOCKET_ERROR) { lpClientContext->ErrorCode = WSAGetLastError(); if(lpClientContext->ErrorCode != WSA_IO_PENDING) { pThis->SMBA("IOCP System", "lpClientContext->ErrorCode != WSA_IO_PENDING"); int asd = 123; // ---- } } } break; default: break; } // End Switch; } // End While; } // End ServerWorkerThread(); //---------------------------------------------------------------------------
void SendDisassemblePacket(LPClientContext lpClientContext, int len) { .... } //---------------------------------------------------------------------------
void __fastcall TSocket::SMBA(String sCaption, String sText) { Application->MessageBoxA(sText.w_str(), sCaption.w_str(), 0); } //---------------------------------------------------------------------------
void TCloseSocket(SOCKET Socket, AnsiString SendText) { try { shutdown(Socket, SD_BOTH); closesocket(Socket); } catch(...) {} // ---- WLogF("TCloseSocket: " + SendText, g_FileDir); } //---------------------------------------------------------------------------
void WLogF(AnsiString TextSend, AnsiString FileDir) { if(FileDir == NULL || FileDir.Length() < 1) return; // ---- FILE *pFile = NULL; char buf[255]; memset(&buf, 0, sizeof(buf)); // ---- AnsiString TSend = "[" + Time().TimeString() + "] " + TextSend + "\n"; strcpy(buf, TSend.c_str()); // ---- try { ForceDirectories(FileDir + "Logs"); // ---- pFile = fopen(g_LogFileName.c_str(), "a+"); fprintf (pFile, buf); fclose(pFile); } catch(...) {} } //---------------------------------------------------------------------------
void __fastcall TSocket::OnAccept() { if (g_bTimeToKill || g_bDisconnectAll) return; // ---- SOCKET ClientSocket; SOCKADDR_IN SockAddr; IN_ADDR SockInAddr; // ---- int SockAddrLen = sizeof(SOCKADDR); int nLen = sizeof(SOCKADDR_IN); AnsiString cIP = ""; int ErrorCode = 0; int nRes = 0; DWORD Flags = 0; // ---- // Accept() the new socket descriptor; //ClientSocket = accept(g_sListen_TCP, (LPSOCKADDR)&SockAddr, &nLen); ClientSocket = WSAAccept(g_sListen_TCP, (sockaddr*)&SockAddr, &SockAddrLen, NULL, 0); // ---- if (ClientSocket == SOCKET_ERROR) { ErrorCode = WSAGetLastError(); if (ErrorCode != WSAEWOULDBLOCK) { // Just log the error and return; return; } } // ----- memcpy(&SockInAddr, &SockAddr.sin_addr, sizeof(SockInAddr)); cIP = inet_ntoa(SockInAddr); // ---- LPClientContext pContext = new ClientContext; ZeroMemory(pContext, sizeof(ClientContext)); // ---- pContext->sListen = ClientSocket; pContext->InBuf = new BYTE[10240]; pContext->OutBuf = new BYTE[10240]; ZeroMemory(pContext->InBuf, 10240); ZeroMemory(pContext->OutBuf, 10240); pContext->wsaInBuf.buf = pContext->InBuf; pContext->wsaInBuf.len = 10240; pContext->wsaOutBuf.buf = pContext->OutBuf; pContext->wsaOutBuf.len = 10240; pContext->dwRecvBytes = 0; pContext->dwSendBytes = 0; // ---- pContext->hReadEvent = CreateEvent(NULL, TRUE, TRUE, NULL); // ---- ZeroMemory(&pContext->WSARecvOverlapped, sizeof(WSAOVERLAPPED)); pContext->WSARecvOverlapped.hEvent = pContext->hReadEvent; // ---- AssociateSocketWithCompletionPort(pContext->sListen, g_CompletionPort, (DWORD)pContext); // ---- EnterCriticalSection(&g_cs); try { g_list.push_back(pContext); } catch(...) {} LeaveCriticalSection(&g_cs); // ---- ///* pContext->iRecvRes = WSARecv(pContext->sListen, &pContext->wsaInBuf, 1, &pContext->dwRecvBytes, &Flags, &pContext->WSARecvOverlapped, NULL); //*/ // ---- /* memcpy(pContext->OutBuf, "\xC1\x04\x00\x01", 4); pContext->wsaOutBuf.len = 4; pContext->iSendRes = WSASend(pContext->sListen, &pContext->wsaOutBuf, 1, &pContext->dwSendBytes, 0, &pContext->WSASendOverlapped, NULL); */ } //---------------------------------------------------------------------------
unsigned __stdcall ListenThreadProc(LPVOID lParam) { TSocket* pThis = reinterpret_cast<TSocket*>(lParam); // ---- DWORD dwRet = 0; int nRes = 0; int ErrorCode = 0; WSANETWORKEVENTS events; // ---- while(!pThis->bShouldStopListenThread) { if (WaitForSingleObject(pThis->g_hKillEvent, g_WaitForSingleObjectValueListenThread) == WAIT_OBJECT_0) { break; } // ---- dwRet = WSAWaitForMultipleEvents(1, &pThis->g_hEvent, FALSE, g_WaitForMultipleEventsValueListenThread, FALSE); // ---- if (dwRet == WSA_WAIT_TIMEOUT) continue; // ---- if (pThis->bShouldStopListenThread) break; // ---- // Figure out what happened nRes = WSAEnumNetworkEvents(pThis->g_sListen_TCP, pThis->g_hEvent, &events); if (nRes == SOCKET_ERROR) { ErrorCode = WSAGetLastError(); break; } // ---- if (pThis->bShouldStopListenThread) break; // ---- if(events.lNetworkEvents & FD_ACCEPT) { if (events.iErrorCode[FD_ACCEPT_BIT] == 0) { pThis->OnAccept(); } else { ErrorCode = WSAGetLastError(); break; } } } // while.... // ---- TCloseSocket(pThis->g_sListen_TCP, ""); // ---- return 0; // Normal Thread Exit Code... } //---------------------------------------------------------------------------
|
# .h:Код | #include <windows.h> #include <stdio.h> #include <vector> using namespace std; // ---- #pragma comment(lib, "ws2_32.lib") #include <winsock2.h> // ---- #include <process.h> // _beginthread; _endthread; //--------------------------------------------------------------------------- #include <Sockets.hpp> //--------------------------------------------------------------------------- #define MaxBuffer 10240 //--------------------------------------------------------------------------- typedef struct { SOCKET sListen; SOCKET sClient; // Input Elements for Winsock WSABUF wsaInBuf; BYTE *InBuf; DWORD dwRecvBytes; int iRecvRes; WSAOVERLAPPED WSARecvOverlapped; // Output elements for Winsock WSABUF wsaOutBuf; BYTE *OutBuf; DWORD dwSendBytes; int iSendRes; WSAOVERLAPPED WSASendOverlapped; // ---- int OperationType; int ErrorCode; HANDLE hReadEvent; } ClientContext, *LPClientContext; //---------------------------------------------------------------------------
class TSocket { friend unsigned __stdcall ListenThreadProc(LPVOID lParam); friend DWORD WINAPI ServerWorkerThread(LPVOID thisContext); // ---- private: SOCKET g_sListen_TCP; // ---- WSAEVENT g_hEvent; HANDLE g_hKillEvent; HANDLE g_hThread; bool bShouldStopListenThread; // ---- bool g_bDisconnectAll; bool g_bTimeToKill; // ---- HANDLE g_CompletionPort; SYSTEM_INFO g_SystemInfo; public: TSocket(); ~TSocket(); // ---- bool __fastcall CreateWS_TCP(WORD _Port); // ---- void __fastcall AddNewElement(SOCKET ClientSocket); // ---- bool __fastcall CreateIOPortAndThreads(HANDLE &CompletionPort, SYSTEM_INFO &SystemInfo); bool __fastcall RemoveIOPortAndThreads(HANDLE &CompletionPort, SYSTEM_INFO &SystemInfo); // ---- void __fastcall OnAccept(); // ---- void __fastcall SMBA(String _Caption, String _Text); }; //--------------------------------------------------------------------------- void TCloseSocket(SOCKET Socket, AnsiString SendText); void WLogF(AnsiString TextSend, AnsiString FileDir); // ---- void SendDisassemblePacket(LPClientContext lpClientContext, int len); //--------------------------------------------------------------------------- DWORD WINAPI ServerWorkerThread(LPVOID thisContext); unsigned __stdcall ListenThreadProc(LPVOID lParam); //--------------------------------------------------------------------------- HANDLE CreateNewCompletionPort(DWORD dwNumberOfConcurrentThreads); bool AssociateSocketWithCompletionPort(SOCKET socket, HANDLE hCompletionPort, DWORD dwCompletionKey); //---------------------------------------------------------------------------
|
P.S. -> Третий день играюсь и никак не могу понять в чём проблема =( Это сообщение отредактировал(а) MuForum - 27.8.2009, 18:28
--------------------
"Чтобы правильно задать вопрос, нужно знать большую часть ответа!" (Р. Шекли)
|