From cc1284a753874aa7d351a6ea41aaec4662ca6e57 Mon Sep 17 00:00:00 2001 From: madmaxoft Date: Mon, 27 Jan 2014 21:27:13 +0100 Subject: Rewritten networking to use non-blocking sockets. This fixes #592. --- src/OSSupport/Socket.cpp | 31 +++++++- src/OSSupport/Socket.h | 9 +++ src/OSSupport/SocketThreads.cpp | 172 +++++++++++++++++++++++++++------------- src/OSSupport/SocketThreads.h | 26 ++++-- 4 files changed, 176 insertions(+), 62 deletions(-) (limited to 'src') diff --git a/src/OSSupport/Socket.cpp b/src/OSSupport/Socket.cpp index 4226a7535..3248eed88 100644 --- a/src/OSSupport/Socket.cpp +++ b/src/OSSupport/Socket.cpp @@ -320,7 +320,7 @@ bool cSocket::ConnectIPv4(const AString & a_HostNameOrAddr, unsigned short a_Por -int cSocket::Receive(char* a_Buffer, unsigned int a_Length, unsigned int a_Flags) +int cSocket::Receive(char * a_Buffer, unsigned int a_Length, unsigned int a_Flags) { return recv(m_Socket, a_Buffer, a_Length, a_Flags); } @@ -354,3 +354,32 @@ unsigned short cSocket::GetPort(void) const + +void cSocket::SetNonBlocking(void) +{ + #ifdef _WIN32 + u_long NonBlocking = 1; + int res = ioctlsocket(m_Socket, FIONBIO, &NonBlocking); + if (res != 0) + { + LOGERROR("Cannot set socket to non-blocking. This would make the server deadlock later on, aborting.\nErr: %d, %d, %s", + res, GetLastError(), GetLastErrorString().c_str() + ); + abort(); + } + #else + int NonBlocking = 1; + int res = ioctl(m_Socket, FIONBIO, (char *)&NonBlocking); + if (res != 0) + { + LOGERROR("Cannot set socket to non-blocking. This would make the server deadlock later on, aborting.\nErr: %d, %d, %s", + res, GetLastError(), GetLastErrorString().c_str() + ); + abort(); + } + #endif +} + + + + diff --git a/src/OSSupport/Socket.h b/src/OSSupport/Socket.h index 4ca3d61f4..bdc2babf5 100644 --- a/src/OSSupport/Socket.h +++ b/src/OSSupport/Socket.h @@ -24,6 +24,12 @@ public: { IPv4 = AF_INET, IPv6 = AF_INET6, + + #ifdef _WIN32 + ErrWouldBlock = WSAEWOULDBLOCK, + #else + ErrWouldBlock = EWOULDBLOCK, + #endif } ; #ifdef _WIN32 @@ -110,6 +116,9 @@ public: unsigned short GetPort(void) const; // Returns 0 on failure const AString & GetIPString(void) const { return m_IPString; } + + /** Sets the socket into non-blocking mode */ + void SetNonBlocking(void); private: xSocket m_Socket; diff --git a/src/OSSupport/SocketThreads.cpp b/src/OSSupport/SocketThreads.cpp index 74932daf8..3e2631733 100644 --- a/src/OSSupport/SocketThreads.cpp +++ b/src/OSSupport/SocketThreads.cpp @@ -175,6 +175,7 @@ void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallbac m_Slots[m_NumSlots].m_Client = a_Client; m_Slots[m_NumSlots].m_Socket = a_Socket; + m_Slots[m_NumSlots].m_Socket.SetNonBlocking(); m_Slots[m_NumSlots].m_Outgoing.clear(); m_Slots[m_NumSlots].m_State = sSlot::ssNormal; m_NumSlots++; @@ -213,7 +214,9 @@ bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client) else { // Query and queue the last batch of outgoing data: - m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing); + AString Data; + m_Slots[i].m_Client->GetOutgoingData(Data); + m_Slots[i].m_Outgoing.append(Data); if (m_Slots[i].m_Outgoing.empty()) { // No more outgoing data, shut the socket down immediately: @@ -386,38 +389,28 @@ void cSocketThreads::cSocketThread::Execute(void) // The main thread loop: while (!m_ShouldTerminate) { - // Put all sockets into the Read set: + // Read outgoing data from the clients: + QueueOutgoingData(); + + // Put sockets into the sets fd_set fdRead; + fd_set fdWrite; cSocket::xSocket Highest = m_ControlSocket1.GetSocket(); - - PrepareSet(&fdRead, Highest, false); + PrepareSets(&fdRead, &fdWrite, Highest); // Wait for the sockets: timeval Timeout; Timeout.tv_sec = 5; Timeout.tv_usec = 0; - if (select(Highest + 1, &fdRead, NULL, NULL, &Timeout) == -1) + if (select(Highest + 1, &fdRead, &fdWrite, NULL, &Timeout) == -1) { - LOG("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); + LOG("select() call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); continue; } + // Perform the IO: ReadFromSockets(&fdRead); - - // Test sockets for writing: - fd_set fdWrite; - Highest = m_ControlSocket1.GetSocket(); - PrepareSet(&fdWrite, Highest, true); - Timeout.tv_sec = 0; - Timeout.tv_usec = 0; - if (select(Highest + 1, NULL, &fdWrite, NULL, &Timeout) == -1) - { - LOG("select(W) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); - continue; - } - WriteToSockets(&fdWrite); - CleanUpShutSockets(); } // while (!mShouldTerminate) } @@ -426,10 +419,11 @@ void cSocketThreads::cSocketThread::Execute(void) -void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest, bool a_IsForWriting) +void cSocketThreads::cSocketThread::PrepareSets(fd_set * a_Read, fd_set * a_Write, cSocket::xSocket & a_Highest) { - FD_ZERO(a_Set); - FD_SET(m_ControlSocket1.GetSocket(), a_Set); + FD_ZERO(a_Read); + FD_ZERO(a_Write); + FD_SET(m_ControlSocket1.GetSocket(), a_Read); cCSLock Lock(m_Parent->m_CS); for (int i = m_NumSlots - 1; i >= 0; --i) @@ -444,11 +438,16 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket continue; } cSocket::xSocket s = m_Slots[i].m_Socket.GetSocket(); - FD_SET(s, a_Set); + FD_SET(s, a_Read); if (s > a_Highest) { a_Highest = s; } + if (!m_Slots[i].m_Outgoing.empty()) + { + // There's outgoing data for the socket, put it in the Write set + FD_SET(s, a_Write); + } } // for i - m_Slots[] } @@ -480,34 +479,37 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read) int Received = m_Slots[i].m_Socket.Receive(Buffer, ARRAYCOUNT(Buffer), 0); if (Received <= 0) { - // The socket has been closed by the remote party - switch (m_Slots[i].m_State) + if (cSocket::GetLastError() != cSocket::ErrWouldBlock) { - case sSlot::ssNormal: - { - // Notify the callback that the remote has closed the socket; keep the slot - m_Slots[i].m_Client->SocketClosed(); - m_Slots[i].m_State = sSlot::ssRemoteClosed; - break; - } - case sSlot::ssWritingRestOut: - case sSlot::ssShuttingDown: - case sSlot::ssShuttingDown2: - { - // Force-close the socket and remove the slot: - m_Slots[i].m_Socket.CloseSocket(); - m_Slots[i] = m_Slots[--m_NumSlots]; - break; - } - default: + // The socket has been closed by the remote party + switch (m_Slots[i].m_State) { - LOG("%s: Unexpected socket state: %d (%s)", - __FUNCTION__, m_Slots[i].m_Socket.GetSocket(), m_Slots[i].m_Socket.GetIPString().c_str() - ); - ASSERT(!"Unexpected socket state"); - break; - } - } // switch (m_Slots[i].m_State) + case sSlot::ssNormal: + { + // Notify the callback that the remote has closed the socket; keep the slot + m_Slots[i].m_Client->SocketClosed(); + m_Slots[i].m_State = sSlot::ssRemoteClosed; + break; + } + case sSlot::ssWritingRestOut: + case sSlot::ssShuttingDown: + case sSlot::ssShuttingDown2: + { + // Force-close the socket and remove the slot: + m_Slots[i].m_Socket.CloseSocket(); + m_Slots[i] = m_Slots[--m_NumSlots]; + break; + } + default: + { + LOG("%s: Unexpected socket state: %d (%s)", + __FUNCTION__, m_Slots[i].m_Socket.GetSocket(), m_Slots[i].m_Socket.GetIPString().c_str() + ); + ASSERT(!"Unexpected socket state"); + break; + } + } // switch (m_Slots[i].m_State) + } } else { @@ -539,7 +541,9 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) // Request another chunk of outgoing data: if (m_Slots[i].m_Client != NULL) { - m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing); + AString Data; + m_Slots[i].m_Client->GetOutgoingData(Data); + m_Slots[i].m_Outgoing.append(Data); } if (m_Slots[i].m_Outgoing.empty()) { @@ -553,8 +557,7 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) } } // if (outgoing data is empty) - int Sent = m_Slots[i].m_Socket.Send(m_Slots[i].m_Outgoing.data(), m_Slots[i].m_Outgoing.size()); - if (Sent < 0) + if (!SendDataThroughSocket(m_Slots[i].m_Socket, m_Slots[i].m_Outgoing)) { int Err = cSocket::GetLastError(); LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), GetOSErrorString(Err).c_str()); @@ -565,7 +568,6 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) } return; } - m_Slots[i].m_Outgoing.erase(0, Sent); if (m_Slots[i].m_Outgoing.empty() && (m_Slots[i].m_State == sSlot::ssWritingRestOut)) { @@ -590,8 +592,41 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) +bool cSocketThreads::cSocketThread::SendDataThroughSocket(cSocket & a_Socket, AString & a_Data) +{ + // Send data in smaller chunks, so that the OS send buffers aren't overflown easily + while (!a_Data.empty()) + { + size_t NumToSend = std::min(a_Data.size(), (size_t)1024); + int Sent = a_Socket.Send(a_Data.data(), NumToSend); + if (Sent < 0) + { + int Err = cSocket::GetLastError(); + if (Err == cSocket::ErrWouldBlock) + { + // The OS send buffer is full, leave the outgoing data for the next time + return true; + } + // An error has occured + return false; + } + if (Sent == 0) + { + a_Socket.CloseSocket(); + return true; + } + a_Data.erase(0, Sent); + } + return true; +} + + + + + void cSocketThreads::cSocketThread::CleanUpShutSockets(void) { + cCSLock Lock(m_Parent->m_CS); for (int i = m_NumSlots - 1; i >= 0; i--) { switch (m_Slots[i].m_State) @@ -617,3 +652,32 @@ void cSocketThreads::cSocketThread::CleanUpShutSockets(void) +void cSocketThreads::cSocketThread::QueueOutgoingData(void) +{ + cCSLock Lock(m_Parent->m_CS); + for (int i = 0; i < m_NumSlots; i++) + { + if (m_Slots[i].m_Client != NULL) + { + AString Data; + m_Slots[i].m_Client->GetOutgoingData(Data); + m_Slots[i].m_Outgoing.append(Data); + } + if (m_Slots[i].m_Outgoing.empty()) + { + // No outgoing data is ready + if (m_Slots[i].m_State == sSlot::ssWritingRestOut) + { + // The socket doesn't want to be kept alive anymore, and doesn't have any remaining data to send. + // Shut it down and then close it after a timeout, or when the other side agrees + m_Slots[i].m_State = sSlot::ssShuttingDown; + m_Slots[i].m_Socket.ShutdownReadWrite(); + } + continue; + } + } +} + + + + diff --git a/src/OSSupport/SocketThreads.h b/src/OSSupport/SocketThreads.h index 9e1947ab6..fcd2ce11f 100644 --- a/src/OSSupport/SocketThreads.h +++ b/src/OSSupport/SocketThreads.h @@ -66,7 +66,8 @@ public: /** Called when data is received from the remote party */ virtual void DataReceived(const char * a_Data, int a_Size) = 0; - /** Called when data can be sent to remote party; the function is supposed to *append* outgoing data to a_Data */ + /** Called when data can be sent to remote party + The function is supposed to *set* outgoing data to a_Data (overwrite) */ virtual void GetOutgoingData(AString & a_Data) = 0; /** Called when the socket has been closed for any reason */ @@ -156,16 +157,27 @@ private: virtual void Execute(void) override; - /** Puts all sockets into the set, along with m_ControlSocket1. - Only sockets that are able to send and receive data are put in the Set. - Is a_IsForWriting is true, the ssWritingRestOut sockets are added as well. */ - void PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest, bool a_IsForWriting); + /** Prepares the Read and Write socket sets for select() + Puts all sockets into the read set, along with m_ControlSocket1. + Only sockets that have outgoing data queued on them are put in the write set.*/ + void PrepareSets(fd_set * a_ReadSet, fd_set * a_WriteSet, cSocket::xSocket & a_Highest); - void ReadFromSockets(fd_set * a_Read); // Reads from sockets indicated in a_Read - void WriteToSockets (fd_set * a_Write); // Writes to sockets indicated in a_Write + /** Reads from sockets indicated in a_Read */ + void ReadFromSockets(fd_set * a_Read); + /** Writes to sockets indicated in a_Write */ + void WriteToSockets (fd_set * a_Write); + + /** Sends data through the specified socket, trying to fill the OS send buffer in chunks. + Returns true if there was no error while sending, false if an error has occured. + Modifies a_Data to contain only the unsent data. */ + bool SendDataThroughSocket(cSocket & a_Socket, AString & a_Data); + /** Removes those slots in ssShuttingDown2 state, sets those with ssShuttingDown state to ssShuttingDown2 */ void CleanUpShutSockets(void); + + /** Calls each client's callback to retrieve outgoing data for that client. */ + void QueueOutgoingData(void); } ; typedef std::list cSocketThreadList; -- cgit v1.2.3 From 4169af1ce1a022d364edfa3313aa5a21d594f198 Mon Sep 17 00:00:00 2001 From: madmaxoft Date: Mon, 27 Jan 2014 21:33:06 +0100 Subject: Fixed Linux compilation. --- src/OSSupport/Socket.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/OSSupport/Socket.cpp b/src/OSSupport/Socket.cpp index 3248eed88..064abbab5 100644 --- a/src/OSSupport/Socket.cpp +++ b/src/OSSupport/Socket.cpp @@ -6,7 +6,8 @@ #ifndef _WIN32 #include #include - #include //inet_ntoa() + #include // inet_ntoa() + #include // ioctl() #else #define socklen_t int #endif -- cgit v1.2.3 From a359275064ecbaf3608995e82875532419a8ed80 Mon Sep 17 00:00:00 2001 From: madmaxoft Date: Mon, 27 Jan 2014 21:34:54 +0100 Subject: Squashed common code. --- src/OSSupport/Socket.cpp | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) (limited to 'src') diff --git a/src/OSSupport/Socket.cpp b/src/OSSupport/Socket.cpp index 064abbab5..6afaceedf 100644 --- a/src/OSSupport/Socket.cpp +++ b/src/OSSupport/Socket.cpp @@ -361,24 +361,17 @@ void cSocket::SetNonBlocking(void) #ifdef _WIN32 u_long NonBlocking = 1; int res = ioctlsocket(m_Socket, FIONBIO, &NonBlocking); - if (res != 0) - { - LOGERROR("Cannot set socket to non-blocking. This would make the server deadlock later on, aborting.\nErr: %d, %d, %s", - res, GetLastError(), GetLastErrorString().c_str() - ); - abort(); - } #else int NonBlocking = 1; int res = ioctl(m_Socket, FIONBIO, (char *)&NonBlocking); - if (res != 0) - { - LOGERROR("Cannot set socket to non-blocking. This would make the server deadlock later on, aborting.\nErr: %d, %d, %s", - res, GetLastError(), GetLastErrorString().c_str() - ); - abort(); - } #endif + if (res != 0) + { + LOGERROR("Cannot set socket to non-blocking. This would make the server deadlock later on, aborting.\nErr: %d, %d, %s", + res, GetLastError(), GetLastErrorString().c_str() + ); + abort(); + } } -- cgit v1.2.3