Commit ac6b06a9 authored by Jussi Lindgren's avatar Jussi Lindgren

Server: Client threads now use a condition variable to wait

parent 14c319f6
......@@ -18,6 +18,7 @@
#include <cctype>
#include <cstring>
#include <cmath> // std::isnan, std::isfinite
#include <condition_variable>
#include <cassert>
......@@ -166,23 +167,31 @@ namespace OpenViBEAcquisitionServer
CConnectionClientHandlerThread(CAcquisitionServer& rAcquisitionServer, Socket::IConnection& rConnection)
:m_rAcquisitionServer(rAcquisitionServer)
,m_rConnection(rConnection)
,m_bPleaseQuit(false)
{
}
void operator()(void)
{
do
std::unique_lock<std::mutex> oLock(m_oClientThreadMutex, std::defer_lock);
while(true)
{
CMemoryBuffer* l_pMemoryBuffer=NULL;
// Wait until there is data or we're told to quit
oLock.lock();
m_oPendingBufferCondition.wait(oLock);
if(m_bPleaseQuit || !m_rConnection.isConnected())
{
DoubleLock lock(&m_oPendingBufferProtectionMutex, &m_oPendingBufferExecutionMutex);
break;
}
if(m_vPendingBuffer.size())
{
l_pMemoryBuffer=m_vPendingBuffer.front();
m_vPendingBuffer.pop_front();
}
CMemoryBuffer* l_pMemoryBuffer=NULL;
if(m_vClientPendingBuffer.size())
{
l_pMemoryBuffer=m_vClientPendingBuffer.front();
m_vClientPendingBuffer.pop_front();
}
if(l_pMemoryBuffer)
......@@ -194,35 +203,51 @@ namespace OpenViBEAcquisitionServer
}
else
{
System::Time::sleep((uint32)m_rAcquisitionServer.m_ui64StartedDriverSleepDuration);
// Since we didn't have a memory buffer, maybe it was a spurious wakeup, we'll just go back to waiting
}
oLock.unlock();
}
while(m_rConnection.isConnected());
while(m_vPendingBuffer.size())
while(m_vClientPendingBuffer.size())
{
delete m_vPendingBuffer.front();
m_vPendingBuffer.pop_front();
delete m_vClientPendingBuffer.front();
m_vClientPendingBuffer.pop_front();
}
if(oLock.owns_lock())
{
oLock.unlock();
}
// Tell the main thread we have quit
m_oPendingBufferCondition.notify_one();
}
void scheduleBuffer(const IMemoryBuffer& rMemoryBuffer)
{
CMemoryBuffer* l_pMemoryBuffer=new CMemoryBuffer(rMemoryBuffer);
DoubleLock lock(&m_oPendingBufferProtectionMutex, &m_oPendingBufferExecutionMutex);
{
std::lock_guard<std::mutex> oLock(m_oClientThreadMutex);
m_vClientPendingBuffer.push_back(l_pMemoryBuffer);
}
m_vPendingBuffer.push_back(l_pMemoryBuffer);
m_oPendingBufferCondition.notify_one();
}
CAcquisitionServer& m_rAcquisitionServer;
Socket::IConnection& m_rConnection;
std::deque < CMemoryBuffer* > m_vPendingBuffer;
std::deque < CMemoryBuffer* > m_vClientPendingBuffer;
// Here we use a condition variable to avoid sleeping
std::mutex m_oClientThreadMutex;
std::condition_variable m_oPendingBufferCondition;
// Should this thread quit?
bool m_bPleaseQuit;
// See class DoubleLock
std::mutex m_oPendingBufferProtectionMutex;
std::mutex m_oPendingBufferExecutionMutex;
};
static void start_connection_client_handler_thread(CConnectionClientHandlerThread* pThread)
......@@ -457,6 +482,8 @@ boolean CAcquisitionServer::loop(void)
l_pConnection->release();
if(itConnection->second.m_pConnectionClientHandlerStdThread)
{
requestClientThreadQuit(itConnection->second.m_pConnectionClientHandlerThread);
itConnection->second.m_pConnectionClientHandlerStdThread->join();
delete itConnection->second.m_pConnectionClientHandlerStdThread;
delete itConnection->second.m_pConnectionClientHandlerThread;
......@@ -480,7 +507,7 @@ boolean CAcquisitionServer::loop(void)
l_bResult=true;
l_bTimeout=false;
m_bGotData=false;
uint32 l_ui32StartTime=System::Time::getTime();
const uint32 l_ui32TimeBeforeCall=System::Time::getTime();
while(l_bResult && !m_bGotData && !l_bTimeout)
{
// Calls driver's loop
......@@ -488,7 +515,7 @@ boolean CAcquisitionServer::loop(void)
if(!m_bGotData)
{
System::Time::sleep((uint32)m_ui64StartedDriverSleepDuration);
l_bTimeout=(System::Time::getTime()>l_ui32StartTime+m_ui64DriverTimeoutDuration);
l_bTimeout=(System::Time::getTime()>l_ui32TimeBeforeCall+m_ui64DriverTimeoutDuration);
}
}
if(l_bTimeout)
......@@ -835,6 +862,7 @@ boolean CAcquisitionServer::start(void)
}
// m_pDriverContext->onStart(*m_pDriver->getHeader());
m_ui64StartTime = System::Time::zgetTime();
m_ui64LastDeliveryTime = m_ui64StartTime;
......@@ -848,6 +876,24 @@ boolean CAcquisitionServer::start(void)
m_rKernelContext.getLogManager() << LogLevel_Info << "Now acquiring...\n";
m_bStarted=true;
return true;
}
bool CAcquisitionServer::requestClientThreadQuit(CConnectionClientHandlerThread* th)
{
std::unique_lock<std::mutex> oLock(th->m_oClientThreadMutex);
// Tell the thread to quit
th->m_bPleaseQuit = true;
oLock.unlock();
th->m_oPendingBufferCondition.notify_one();
// Wait for it to quit
oLock.lock();
th->m_oPendingBufferCondition.wait(oLock);
// It should do done now
return true;
}
......@@ -870,6 +916,8 @@ boolean CAcquisitionServer::stop(void)
itConnection->first->close();
if(itConnection->second.m_pConnectionClientHandlerStdThread)
{
requestClientThreadQuit(itConnection->second.m_pConnectionClientHandlerThread);
itConnection->second.m_pConnectionClientHandlerStdThread->join();
delete itConnection->second.m_pConnectionClientHandlerStdThread;
delete itConnection->second.m_pConnectionClientHandlerThread;
......@@ -935,6 +983,7 @@ void CAcquisitionServer::setSamples(const float32* pSample)
void CAcquisitionServer::setSamples(const float32* pSample, const uint32 ui32SampleCount)
{
if(m_bStarted)
{
for(uint32 i=0; i<ui32SampleCount; i++)
......
......@@ -109,6 +109,10 @@ namespace OpenViBEAcquisitionServer
//
virtual OpenViBE::boolean acceptNewConnection(Socket::IConnection* pConnection);
protected:
bool CAcquisitionServer::requestClientThreadQuit(CConnectionClientHandlerThread* th);
public:
// See class DoubleLock
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment