From 0e33c919dd5c954e0e9d266924c1650237bb95a1 Mon Sep 17 00:00:00 2001 From: "madmaxoft@gmail.com" Date: Sun, 26 Feb 2012 00:36:51 +0000 Subject: Using cSocketThreads for client outgoing packets. Unfortunately had to put in one intermediate thread (cServer::cNotifyWriteThread) to avoid deadlocks. Still, seems we have a proper multithreading for clients and no more per-client threads, yay :) git-svn-id: http://mc-server.googlecode.com/svn/trunk@328 0a769ca7-a7f5-676a-18bf-c427514a06d6 --- source/cClientHandle.cpp | 188 ++++++++++++++++++----------------------------- 1 file changed, 73 insertions(+), 115 deletions(-) (limited to 'source/cClientHandle.cpp') diff --git a/source/cClientHandle.cpp b/source/cClientHandle.cpp index 9eed31a3a..41d2fda6c 100644 --- a/source/cClientHandle.cpp +++ b/source/cClientHandle.cpp @@ -77,7 +77,8 @@ case 2: (z)-=(amount); break; case 3: (z)+=(amount); break;\ case 4: (x)-=(amount); break; case 5: (x)+=(amount); break; } -#define MAX_SEMAPHORES (2000) +/// If the number of queued outgoing packets reaches this, the client will be kicked +#define MAX_OUTGOING_PACKETS 2000 @@ -89,9 +90,7 @@ cClientHandle::cClientHandle(const cSocket & a_Socket, int a_ViewDistance) : m_ViewDistance(a_ViewDistance) , m_ProtocolVersion(23) - , m_pSendThread(NULL) , m_Socket(a_Socket) - , m_Semaphore(MAX_SEMAPHORES) , m_bDestroyed(false) , m_Player(NULL) , m_bKicking(false) @@ -135,11 +134,6 @@ cClientHandle::cClientHandle(const cSocket & a_Socket, int a_ViewDistance) m_PacketMap[E_RESPAWN] = new cPacket_Respawn; m_PacketMap[E_PING] = new cPacket_Ping; - ////////////////////////////////////////////////////////////////////////// - m_pSendThread = new cThread(SendThread, this, "cClientHandle::SendThread"); - m_pSendThread->Start (true); - ////////////////////////////////////////////////////////////////////////// - LOG("New ClientHandle created at %p", this); } @@ -149,7 +143,7 @@ cClientHandle::cClientHandle(const cSocket & a_Socket, int a_ViewDistance) cClientHandle::~cClientHandle() { - LOG("Deleting client \"%s\"", GetUsername().c_str()); + LOG("Deleting client \"%s\" at %p", GetUsername().c_str(), this); // Remove from cSocketThreads, we're not to be called anymore: cRoot::Get()->GetServer()->ClientDestroying(this); @@ -173,20 +167,13 @@ cClientHandle::~cClientHandle() } } - // First stop sending thread - m_bKeepThreadGoing = false; - if (m_Socket.IsValid()) { cPacket_Disconnect Disconnect; Disconnect.m_Reason = "Server shut down? Kthnxbai"; m_Socket.Send(&Disconnect); - m_Socket.CloseSocket(); } - - m_Semaphore.Signal(); - delete m_pSendThread; - + if (m_Player != NULL) { m_Player->SetClientHandle(NULL); @@ -198,19 +185,31 @@ cClientHandle::~cClientHandle() delete m_PacketMap[i]; } + // Queue all remaining outgoing packets to cSocketThreads: { - cCSLock Lock(m_SendCriticalSection); + cCSLock Lock(m_CSPackets); for (PacketList::iterator itr = m_PendingNrmSendPackets.begin(); itr != m_PendingNrmSendPackets.end(); ++itr) { + AString Data; + (*itr)->Serialize(Data); + cRoot::Get()->GetServer()->WriteToClient(&m_Socket, Data); delete *itr; } + m_PendingNrmSendPackets.clear(); for (PacketList::iterator itr = m_PendingLowSendPackets.begin(); itr != m_PendingLowSendPackets.end(); ++itr) { + AString Data; + (*itr)->Serialize(Data); + cRoot::Get()->GetServer()->WriteToClient(&m_Socket, Data); delete *itr; } + m_PendingLowSendPackets.clear(); } - LOG("ClientHandle at %p destroyed", this); + // Queue the socket to close as soon as it sends all outgoing data: + cRoot::Get()->GetServer()->QueueClientClose(&m_Socket); + + LOG("ClientHandle at %p deleted", this); } @@ -295,8 +294,8 @@ void cClientHandle::Authenticate(void) Send(Health); m_Player->Initialize(World); - m_State = csDownloadingWorld; StreamChunks(); + m_State = csDownloadingWorld; } @@ -305,7 +304,7 @@ void cClientHandle::Authenticate(void) void cClientHandle::StreamChunks(void) { - if (m_State < csDownloadingWorld) + if (m_State < csAuthenticating) { return; } @@ -323,7 +322,7 @@ void cClientHandle::StreamChunks(void) m_LastStreamedChunkZ = ChunkPosZ; // DEBUG: - LOGINFO("Streaming chunks centered on [%d, %d]", ChunkPosX, ChunkPosZ); + LOGINFO("Streaming chunks centered on [%d, %d], view distance %d", ChunkPosX, ChunkPosZ, m_ViewDistance); cWorld * World = m_Player->GetWorld(); ASSERT(World != NULL); @@ -1645,7 +1644,10 @@ void cClientHandle::Tick(float a_Dt) if (m_State >= csDownloadingWorld) { cWorld * World = m_Player->GetWorld(); + cCSLock Lock(m_CSChunkLists); + + // Send the chunks: int NumSent = 0; for (cChunkCoordsList::iterator itr = m_ChunksToSend.begin(); itr != m_ChunksToSend.end();) { @@ -1662,7 +1664,8 @@ void cClientHandle::Tick(float a_Dt) break; } } // for itr - m_ChunksToSend[] - + Lock.Unlock(); + // Check even if we didn't send anything - a chunk may have sent a notification that we'd miss otherwise CheckIfWorldDownloaded(); } @@ -1707,8 +1710,7 @@ void cClientHandle::Send(const cPacket * a_Packet, ENUM_PRIORITY a_Priority /* = } } - bool bSignalSemaphore = true; - cCSLock Lock(m_SendCriticalSection); + cCSLock Lock(m_CSPackets); if (a_Priority == E_PRIORITY_NORMAL) { if (a_Packet->m_PacketID == E_REL_ENT_MOVE_LOOK) @@ -1727,7 +1729,6 @@ void cClientHandle::Send(const cPacket * a_Packet, ENUM_PRIORITY a_Priority /* = { Packets.erase(itr); bBreak = true; - bSignalSemaphore = false; // Because 1 packet is removed, semaphore count is the same delete PacketData; break; } @@ -1747,10 +1748,9 @@ void cClientHandle::Send(const cPacket * a_Packet, ENUM_PRIORITY a_Priority /* = m_PendingLowSendPackets.push_back(a_Packet->Clone()); } Lock.Unlock(); - if (bSignalSemaphore) - { - m_Semaphore.Signal(); - } + + // Notify SocketThreads that we have something to write: + cRoot::Get()->GetServer()->NotifyClientWrite(this); } @@ -1797,90 +1797,6 @@ void cClientHandle::SendConfirmPosition(void) -void cClientHandle::SendThread(void *lpParam) -{ - cClientHandle* self = (cClientHandle*)lpParam; - PacketList & NrmSendPackets = self->m_PendingNrmSendPackets; - PacketList & LowSendPackets = self->m_PendingLowSendPackets; - - - while (self->m_bKeepThreadGoing && self->m_Socket.IsValid()) - { - self->m_Semaphore.Wait(); - cCSLock Lock(self->m_SendCriticalSection); - if (NrmSendPackets.size() + LowSendPackets.size() > MAX_SEMAPHORES) - { - LOGERROR("ERROR: Too many packets in queue for player %s !!", self->m_Username.c_str()); - cPacket_Disconnect DC("Too many packets in queue."); - self->m_Socket.Send(DC); - - cSleep::MilliSleep(1000); // Give packet some time to be received - - Lock.Unlock(); - self->Destroy(); - break; - } - - if (NrmSendPackets.size() == 0 && LowSendPackets.size() == 0) - { - ASSERT(!self->m_bKeepThreadGoing); - if (self->m_bKeepThreadGoing) - { - LOGERROR("ERROR: Semaphore was signaled while no packets to send"); - } - continue; - } - if (NrmSendPackets.size() > MAX_SEMAPHORES / 2) - { - LOGINFO("Pending packets: %i Last: 0x%02x", NrmSendPackets.size(), (*NrmSendPackets.rbegin())->m_PacketID); - } - - cPacket * Packet = NULL; - if (!NrmSendPackets.empty()) - { - Packet = *NrmSendPackets.begin(); - NrmSendPackets.erase(NrmSendPackets.begin()); - } - else if (!LowSendPackets.empty()) - { - Packet = *LowSendPackets.begin(); - LowSendPackets.erase(LowSendPackets.begin()); - } - Lock.Unlock(); - - if (!self->m_Socket.IsValid()) - { - break; - } - - // LOG("Sending packet 0x%02x to \"%s\" (\"%s\")", Packet->m_PacketID, self->m_Socket.GetIPString().c_str(), self->m_Username.c_str()); - - bool bSuccess = self->m_Socket.Send(Packet); - - if (!bSuccess) - { - LOGERROR("ERROR: While sending packet 0x%02x to client \"%s\"", Packet->m_PacketID, self->m_Username.c_str()); - delete Packet; - self->Destroy(); - break; - } - delete Packet; - - if (self->m_bKicking && (NrmSendPackets.size() + LowSendPackets.size() == 0)) // Disconnect player after all packets have been sent - { - cSleep::MilliSleep(1000); // Give all packets some time to be received - self->Destroy(); - break; - } - } - - return; -} - - - - - const AString & cClientHandle::GetUsername(void) const { return m_Username; @@ -1967,7 +1883,49 @@ void cClientHandle::GetOutgoingData(AString & a_Data) { // Data can be sent to client - // TODO + cCSLock Lock(m_CSPackets); + if (m_PendingNrmSendPackets.size() + m_PendingLowSendPackets.size() > MAX_OUTGOING_PACKETS) + { + LOGERROR("ERROR: Too many packets in queue for player %s !!", m_Username.c_str()); + cPacket_Disconnect DC("Too many packets in queue."); + m_Socket.Send(DC); + Lock.Unlock(); + Destroy(); + return; + } + + if ((m_PendingNrmSendPackets.size() == 0) && (m_PendingLowSendPackets.size() == 0)) + { + return; + } + + if (m_PendingNrmSendPackets.size() > MAX_OUTGOING_PACKETS / 2) + { + LOGINFO("Suspiciously many pending packets: %i; client \"%s\", LastType: 0x%02x", m_PendingNrmSendPackets.size(), m_Username.c_str(), (*m_PendingNrmSendPackets.rbegin())->m_PacketID); + } + + AString Data; + if (!m_PendingNrmSendPackets.empty()) + { + m_PendingNrmSendPackets.front()->Serialize(Data); + delete m_PendingNrmSendPackets.front(); + m_PendingNrmSendPackets.erase(m_PendingNrmSendPackets.begin()); + } + else if (!m_PendingLowSendPackets.empty()) + { + m_PendingLowSendPackets.front()->Serialize(Data); + delete m_PendingLowSendPackets.front(); + m_PendingLowSendPackets.erase(m_PendingLowSendPackets.begin()); + } + Lock.Unlock(); + + a_Data.append(Data); + + // Disconnect player after all packets have been sent + if (m_bKicking && (m_PendingNrmSendPackets.size() + m_PendingLowSendPackets.size() == 0)) + { + Destroy(); + } } -- cgit v1.2.3