Commit e87b9dea authored by Jussi Lindgren's avatar Jussi Lindgren

Server: Unified chunk time boundary arithmetic to match Acquisition Client box

- Additionally, removed redundant extra copy of the stimulations buffer
parent a9bec01e
......@@ -26,6 +26,8 @@
#include <iostream>
// #include <iomanip>
#define boolean OpenViBE::boolean
namespace
......@@ -425,19 +427,26 @@ boolean CAcquisitionServer::loop(void)
l_i64SignedTheoreticalSampleCountToSkip=((int64(itConnection->second.m_ui64ConnectionTime-m_ui64LastDeliveryTime)*m_ui32SamplingFrequency)>>32)+m_vPendingBuffer.size();
}
uint64 l_ui64TheoreticalSampleCountToSkip=(l_i64SignedTheoreticalSampleCountToSkip<0?0:uint64(l_i64SignedTheoreticalSampleCountToSkip));
const uint64 l_ui64TheoreticalSampleCountToSkip=(l_i64SignedTheoreticalSampleCountToSkip<0?0:uint64(l_i64SignedTheoreticalSampleCountToSkip));
m_rKernelContext.getLogManager() << LogLevel_Trace << "Sample count offset at connection : " << l_ui64TheoreticalSampleCountToSkip << "\n";
if ( (m_ui64SampleCount-m_vPendingBuffer.size()) % m_ui32SampleCountPerSentBlock != 0)
m_rKernelContext.getLogManager() << LogLevel_Error << "Buffer start sample " << m_ui64SampleCount-m_vPendingBuffer.size() << " doesn't seem to be divisible by "
<< m_ui32SampleCountPerSentBlock << " (case B)\n";
const uint64 l_ui64BufferDuration = ip_ui64BufferDuration;
const uint64 l_ui64PastBufferCount = (m_ui64SampleCount-m_vPendingBuffer.size())/m_ui32SampleCountPerSentBlock;
const uint64 l_ui64ConnBufferTimeOffset = ITimeArithmetics::sampleCountToTime(m_ui32SamplingFrequency, l_ui64TheoreticalSampleCountToSkip);
SConnectionInfo l_oInfo;
l_oInfo.m_ui64ConnectionTime=itConnection->second.m_ui64ConnectionTime;
l_oInfo.m_ui64StimulationTimeOffset=ITimeArithmetics::sampleCountToTime(m_ui32SamplingFrequency, l_ui64TheoreticalSampleCountToSkip+m_ui64SampleCount-m_vPendingBuffer.size());
l_oInfo.m_ui64SignalSampleCountToSkip=l_ui64TheoreticalSampleCountToSkip;
l_oInfo.m_ui64StimulationTimeOffset = l_ui64PastBufferCount * l_ui64BufferDuration + l_ui64ConnBufferTimeOffset;
l_oInfo.m_ui64SignalSampleCountToSkip = l_ui64TheoreticalSampleCountToSkip;
l_oInfo.m_pConnectionClientHandlerThread=new CConnectionClientHandlerThread(*this, *l_pConnection);
l_oInfo.m_pConnectionClientHandlerStdThread=new std::thread(std::bind(&start_connection_client_handler_thread, l_oInfo.m_pConnectionClientHandlerThread));
l_oInfo.m_bChannelLocalisationSent = false;
l_oInfo.m_bChannelUnitsSent = false;
//applyPriority(l_oInfo.m_pConnectionClientHandlerStdThread,15);
m_vConnection.push_back(pair < Socket::IConnection*, SConnectionInfo >(l_pConnection, l_oInfo));
......@@ -565,17 +574,23 @@ boolean CAcquisitionServer::loop(void)
const int64 p = m_ui64SampleCount-m_vPendingBuffer.size();
if (p < 0)
m_rKernelContext.getLogManager() << LogLevel_Error << "Signed number used for bit operations:" << p << " (case A)\n";
if (p % m_ui32SampleCountPerSentBlock != 0)
m_rKernelContext.getLogManager() << LogLevel_Error << "Buffer start sample " << p << " doesn't seem to be divisible by "
<< m_ui32SampleCountPerSentBlock << " (case A)\n";
// n.b. here we use arithmetic based on buffer duration so that we are in perfect agreement with
// Acquisition Client box that sets the chunk starts and ends by steps of buffer duration.
const uint64 l_ui64BufferDuration = ip_ui64BufferDuration;
const uint64 l_ui64BufferStartSamples = m_ui64SampleCount-m_vPendingBuffer.size();
const uint64 l_ui64BufferEndSamples = m_ui64SampleCount-m_vPendingBuffer.size()+m_ui32SampleCountPerSentBlock;
const uint64 l_ui64BufferStartTime = ITimeArithmetics::sampleCountToTime(m_ui32SamplingFrequency, l_ui64BufferStartSamples);
const uint64 l_ui64BufferEndTime = ITimeArithmetics::sampleCountToTime(m_ui32SamplingFrequency, l_ui64BufferEndSamples);
const uint64 l_ui64SampleTime = ITimeArithmetics::sampleCountToTime(m_ui32SamplingFrequency, m_ui64SampleCount);
const uint64 l_ui64PastBufferCount = l_ui64BufferStartSamples / m_ui32SampleCountPerSentBlock;
const uint64 l_ui64BufferStartTime = l_ui64PastBufferCount * l_ui64BufferDuration;
const uint64 l_ui64BufferEndTime = l_ui64BufferStartTime + l_ui64BufferDuration;
const uint64 l_ui64LastSampleTime = ITimeArithmetics::sampleCountToTime(m_ui32SamplingFrequency, m_ui64SampleCount);
// Pass the stimuli and buffer to all plugins; note that they may modify them
for(std::vector<IAcquisitionServerPlugin*>::iterator itp = m_vPlugins.begin(); itp != m_vPlugins.end(); ++itp)
for(auto itp = m_vPlugins.begin(); itp != m_vPlugins.end(); ++itp)
{
(*itp)->loopHook(m_vPendingBuffer, m_oPendingStimulationSet, l_ui64BufferStartTime, l_ui64BufferEndTime, l_ui64SampleTime);
(*itp)->loopHook(m_vPendingBuffer, m_oPendingStimulationSet, l_ui64BufferStartTime, l_ui64BufferEndTime, l_ui64LastSampleTime);
}
// Handle connections
......@@ -599,27 +614,38 @@ boolean CAcquisitionServer::loop(void)
}
}
// int l = l_oStimulationSet.getStimulationCount();
const int64 p = l_ui64BufferStartSamples+l_rInfo.m_ui64SignalSampleCountToSkip;
if (p < 0)
m_rKernelContext.getLogManager() << LogLevel_Error << "Signed number used for bit operations:" << p << " (case B)\n";
const uint64 l_ui64ConnBlockStartTime = ITimeArithmetics::sampleCountToTime(m_ui32SamplingFrequency, l_ui64BufferStartSamples + l_rInfo.m_ui64SignalSampleCountToSkip);
const uint64 l_ui64ConnBlockEndTime = ITimeArithmetics::sampleCountToTime(m_ui32SamplingFrequency, l_ui64BufferEndSamples + l_rInfo.m_ui64SignalSampleCountToSkip);
// Boundaries of the part of the buffer to be sent to this connection
const uint64 l_ui64ConnBufferTimeOffset = ITimeArithmetics::sampleCountToTime(m_ui32SamplingFrequency, l_rInfo.m_ui64SignalSampleCountToSkip);
const uint64 l_ui64ConnBlockStartTime = l_ui64BufferStartTime + l_ui64ConnBufferTimeOffset;
const uint64 l_ui64ConnBlockEndTime = l_ui64BufferEndTime + l_ui64ConnBufferTimeOffset;
//m_rKernelContext.getLogManager() << LogLevel_Info << "start: " << time64(start) << "end: " << time64(end) << "\n";
// Stimulation buffer
CStimulationSet l_oStimulationSet;
IStimulationSet& l_oStimulationSet = *ip_pStimulationSet;
l_oStimulationSet.clear();
// Take the stimuli range valid for the connection
OpenViBEToolkit::Tools::StimulationSet::appendRange(
l_oStimulationSet,
m_oPendingStimulationSet,
l_ui64ConnBlockStartTime,l_ui64ConnBlockEndTime);
OpenViBEToolkit::Tools::StimulationSet::copy(*ip_pStimulationSet, l_oStimulationSet, -int64(l_rInfo.m_ui64StimulationTimeOffset));
// Take the stimuli range valid for the buffer and adjust wrt connection time (stamp at connection = stamp at time 0 for the client)
for(size_t k=0;k<m_oPendingStimulationSet.getStimulationCount();k++)
{
const uint64 lDate = m_oPendingStimulationSet.getStimulationDate(k); // this date is wrt the whole acquisition time in the server
if(lDate >= l_ui64ConnBlockStartTime && lDate <= l_ui64ConnBlockEndTime)
{
// The new date is wrt the specific connection time of the client (i.e. the chunk times on Designer side)
const uint64 lNewDate =
( (lDate > l_rInfo.m_ui64StimulationTimeOffset) ? (lDate - l_rInfo.m_ui64StimulationTimeOffset) : 0 );
/*
std::cout << std::setprecision(10) << ITimeArithmetics::timeToSeconds(lDate)
<< " to " << ITimeArithmetics::timeToSeconds(lNewDate)
<< " for [" << ITimeArithmetics::timeToSeconds(l_ui64BufferStartTime)
<< "," << ITimeArithmetics::timeToSeconds(l_ui64BufferEndTime) << "]"
<< "\n";
*/
l_oStimulationSet.appendStimulation(m_oPendingStimulationSet.getStimulationIdentifier(k), lNewDate, m_oPendingStimulationSet.getStimulationDuration(k));
}
}
// Send a chunk of channel units? Note that we'll always send the units header.
if(!l_rInfo.m_bChannelUnitsSent)
......@@ -649,17 +675,19 @@ boolean CAcquisitionServer::loop(void)
}
else
{
// Here sample count to skip >= block size. n.b. This is the reason why the pending buffer size needs to be 2x,
// as the skip can be up to 1x bufferSize and after that we need a full buffer to send. Note that by this construction
// Here sample count to skip >= block size, so we effective drop this chunk from the viewpoint of this connection.
// n.b. This is the reason why the pending buffer size needs to be 2x, as the skip can be up to 1x bufferSize and
// after that we need a full buffer to send. Note that by construction
// the count to skip can never underflow but remains >= 0 (the other if branch doesnt decrement).
l_rInfo.m_ui64SignalSampleCountToSkip-=m_ui32SampleCountPerSentBlock;
}
}
// Clears pending stimulations
// Clears pending stimulations; Can start from zero as we know we'll never send anything in the future thats
// before current BufferEndTime.
OpenViBEToolkit::Tools::StimulationSet::removeRange(
m_oPendingStimulationSet,
l_ui64BufferStartTime,
0,
l_ui64BufferEndTime
);
......
......@@ -25,9 +25,9 @@ namespace OpenViBEAcquisitionServer
typedef struct
{
OpenViBE::uint64 m_ui64ConnectionTime;
OpenViBE::uint64 m_ui64StimulationTimeOffset;
OpenViBE::uint64 m_ui64SignalSampleCountToSkip;
OpenViBE::uint64 m_ui64ConnectionTime; // Time the client connected
OpenViBE::uint64 m_ui64StimulationTimeOffset; // Time offset wrt acquisition start
OpenViBE::uint64 m_ui64SignalSampleCountToSkip; // How many samples to skip wrt current buffer start. n.b. not a constant.
CConnectionClientHandlerThread* m_pConnectionClientHandlerThread; // Ptr to the class object that is executed by the client connection handler thread
std::thread* m_pConnectionClientHandlerStdThread; // The actual thread handle
bool m_bChannelUnitsSent;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment