Commit 7f89c0c6 authored by Jussi Lindgren's avatar Jussi Lindgren

Plugins: Tweaks to TCP Writer box

- Added matrix dimensions and bad inputs checking
- Minor code cleanup
- Updated documentation
parent f8302f04
......@@ -13,14 +13,14 @@ requiring unusual dependencies such as VRPN.
Output:
1) If the outputs of the box are raw numeric values, the box first sends every connecting client eight variables of uint32: version number (in network byte order), endianness of the stream (in network byte order, 0==unknown, 1==little, 2==big, 3==pdp), sampling frequency of the signal, the number of channels, the number of samples per chunk and three variables of padding, 8*4=32 bytes in total. The last 6 variables are in the byte order of the stream. Note that only those variables will be non-zero that are meaningful for the input in question.
1) If the outputs of the box are raw numeric values, the box first sends every connecting client eight variables of uint32: format version number (in network byte order), endianness of the stream (in network byte order, 0==unknown, 1==little, 2==big, 3==pdp), sampling frequency of the signal, the number of channels, the number of samples per chunk and three variables of padding, 8*4=32 bytes in total. The last 6 variables are in the byte order of the stream. Note that only those variables will be non-zero that are meaningful for the input in question.
1b) If the output is chosen as hex string or descriptive string (these are valid for Stimulation input only), no header is sent.
2) After the possible global header, the data itself is sent. The data is a stream of float64 for Signal and StreamedMatrix.
The data orientation is [nSamples x nChannels], i.e. all channels for one sample are sent in a sequence, then all channels of the next sample,
and so on (the matrix is a transpose of the usual OpenViBE signal orientation). For Stimulations, the data is uint64 if user chooses raw,
or char strings otherwise.
For signals, the data orientation is [nSamples x nChannels], i.e. all channels for one sample are sent in a sequence, then all channels of the next sample,
and so on (the matrix is a transpose of the usual OpenViBE signal orientation). For matrices, the data is sent as is, in row-major order.
For Stimulations, the data is uint64 if the user chooses raw, or char strings otherwise.
Multiple clients can connect to the socket of the box. The box keeps sending data to each client until either the scenario is stopped or the client disconnects. The box does not guarantee that the client starts to receive the input stream from any particular location. When kernel calls box::process() at time t, all clients connected at time t get forwarded the chunks given to box::process() at t. However, if a client establish a connection during box::process(), it may get a partial chunk of t and the whole chunk of t+1 and so on.
......@@ -75,6 +75,8 @@ __________________________________________________________________
The box sends signal data as chunks with the same chunk size as the stream has.
The box supports only 2 dimensional matrices. To send 1 dimensional matrix, you can try to upgrade it to 2 dimensions with Matrix Transpose box. The box cannot be used to send more than 2 dimensions presently.
The box writes to all the sockets synchronously in the process() call and drops no data. If the connection is too slow to accommodate the data flow, the box will lag.
Detected transmission errors will cause a disconnection of the client.
......
......@@ -47,7 +47,7 @@ void CBoxAlgorithmTCPWriter::handleAccept(const boost::system::error_code& ec, b
this->getLogManager() << LogLevel_Debug << "Handling a new incoming connection\n";
// Send the known configuration to the client
if( m_pActiveDecoder != &m_StimulationDecoder || m_ui64OutputStyle==TCPWRITER_RAW )
if( m_pActiveDecoder != &m_oStimulationDecoder || m_ui64OutputStyle==TCPWRITER_RAW )
{
try
{
......@@ -82,22 +82,23 @@ boolean CBoxAlgorithmTCPWriter::initialize(void)
l_rStaticBoxContext.getInputType(0, m_oInputType);
if(m_oInputType == OV_TypeId_StreamedMatrix)
{
m_pActiveDecoder = &m_MatrixDecoder;
m_pActiveDecoder = &m_oMatrixDecoder;
}
else if(m_oInputType == OV_TypeId_Signal)
{
m_pActiveDecoder = &m_SignalDecoder;
m_pActiveDecoder = &m_oSignalDecoder;
m_oChunkTranspose.setDimensionCount(0);
}
else
{
m_pActiveDecoder = &m_StimulationDecoder;
m_pActiveDecoder = &m_oStimulationDecoder;
}
m_pActiveDecoder->initialize(*this,0);
uint64 l_ui64Port = FSettingValueAutoCast(*this->getBoxAlgorithmContext(), 0);
m_ui64OutputStyle = FSettingValueAutoCast(*this->getBoxAlgorithmContext(), 1);
m_ui32RawVersion = htonl(1);
m_ui32RawVersion = htonl(1); // TCP Writer output format version
#if defined(BOOST_LITTLE_ENDIAN)
m_ui32Endianness = htonl(1);
#elif defined(BOOST_BIG_ENDIAN)
......@@ -201,6 +202,13 @@ boolean CBoxAlgorithmTCPWriter::processInput(uint32 ui32InputIndex)
boolean CBoxAlgorithmTCPWriter::sendToClients(const void* pBuffer, uint32 ui32BufferLength)
{
if(ui32BufferLength==0 || pBuffer == NULL)
{
// Nothing to send, shouldn't happen
this->getLogManager() << LogLevel_Warning << "Asked to send an empty buffer to clients (shouldn't happen)\n";
return false;
}
std::vector<boost::asio::ip::tcp::socket*>::iterator it = m_vSockets.begin();
while(it!=m_vSockets.end())
{
......@@ -255,21 +263,33 @@ boolean CBoxAlgorithmTCPWriter::process(void)
m_pActiveDecoder->decode(j);
if(m_pActiveDecoder->isHeaderReceived())
{
if(m_pActiveDecoder == &m_MatrixDecoder)
if(m_pActiveDecoder == &m_oMatrixDecoder)
{
m_ui32NumberOfChannels = static_cast<uint32> (m_MatrixDecoder.getOutputMatrix()->getDimensionSize(0) );
m_ui32NumberOfSamplesPerChunk = static_cast<uint32> (m_MatrixDecoder.getOutputMatrix()->getDimensionSize(1) );
const uint32 l_ui32NumberOfDimensions = static_cast<uint32> (m_oMatrixDecoder.getOutputMatrix()->getDimensionCount() );
if(l_ui32NumberOfDimensions != 2)
{
this->getLogManager() << LogLevel_Error << "TCPWriter only supports 2 dimensional matrices\n";
return false;
}
m_oChunkTranspose.setDimensionCount(2);
m_oChunkTranspose.setDimensionSize(0,m_ui32NumberOfSamplesPerChunk);
m_oChunkTranspose.setDimensionSize(1,m_ui32NumberOfChannels);
m_ui32NumberOfChannels = static_cast<uint32> (m_oMatrixDecoder.getOutputMatrix()->getDimensionSize(0) );
m_ui32NumberOfSamplesPerChunk = static_cast<uint32> (m_oMatrixDecoder.getOutputMatrix()->getDimensionSize(1) );
}
else if(m_pActiveDecoder == &m_SignalDecoder)
else if(m_pActiveDecoder == &m_oSignalDecoder)
{
m_ui32Frequency = static_cast<uint32> ( m_SignalDecoder.getOutputSamplingRate() );
m_ui32NumberOfChannels = static_cast<uint32> ( m_SignalDecoder.getOutputMatrix()->getDimensionSize(0) );
m_ui32NumberOfSamplesPerChunk = static_cast<uint32> ( m_SignalDecoder.getOutputMatrix()->getDimensionSize(1) );
const uint32 l_ui32NumberOfDimensions = static_cast<uint32> (m_oSignalDecoder.getOutputMatrix()->getDimensionCount() );
if(l_ui32NumberOfDimensions != 2)
{
this->getLogManager() << LogLevel_Error << "TCPWriter only supports 2 dimensional matrices\n";
return false;
}
m_ui32NumberOfChannels = static_cast<uint32> ( m_oSignalDecoder.getOutputMatrix()->getDimensionSize(0) );
m_ui32NumberOfSamplesPerChunk = static_cast<uint32> ( m_oSignalDecoder.getOutputMatrix()->getDimensionSize(1) );
m_ui32Frequency = static_cast<uint32> ( m_oSignalDecoder.getOutputSamplingRate() );\
// C++ is in row major order and openvibe signal matrices are [nChannels x nSamples]. The following buffer
// is used for transposing the chunks to get [nSamples x nChannels] output matrix (only used in case of signals).
m_oChunkTranspose.setDimensionCount(2);
m_oChunkTranspose.setDimensionSize(0,m_ui32NumberOfSamplesPerChunk);
m_oChunkTranspose.setDimensionSize(1,m_ui32NumberOfChannels);
......@@ -278,26 +298,39 @@ boolean CBoxAlgorithmTCPWriter::process(void)
{
// Stimulus, do nothing
}
// Conformance checking for all matrix based streams
if(m_pActiveDecoder == &m_oMatrixDecoder || m_pActiveDecoder == &m_oSignalDecoder)
{
if(m_ui32NumberOfChannels == 0 || m_ui32NumberOfSamplesPerChunk == 0)
{
this->getLogManager() << LogLevel_Error << "For matrix-like inputs, both input dimensions must be larger than 0\n";
return false;
}
}
}
if(m_pActiveDecoder->isBufferReceived())
{
if(m_pActiveDecoder == &m_MatrixDecoder)
if(m_pActiveDecoder == &m_oMatrixDecoder)
{
const IMatrix* l_pMatrix = m_MatrixDecoder.getOutputMatrix();
const float64 *l_pSource = l_pMatrix->getBuffer();
const IMatrix* l_pMatrix = m_oMatrixDecoder.getOutputMatrix();
sendToClients((void *)l_pSource, l_pMatrix->getBufferElementCount()*sizeof(float64));
sendToClients((void *)l_pMatrix->getBuffer(), l_pMatrix->getBufferElementCount()*sizeof(float64));
}
else if(m_pActiveDecoder == &m_SignalDecoder)
else if(m_pActiveDecoder == &m_oSignalDecoder)
{
const IMatrix* l_pMatrix = m_SignalDecoder.getOutputMatrix();
const IMatrix* l_pMatrix = m_oSignalDecoder.getOutputMatrix();
// Transpose
const float64 *l_pSource = l_pMatrix->getBuffer();
float64 *l_pDestination = m_oChunkTranspose.getBuffer();
for(uint32 c=0;c<m_ui32NumberOfChannels;c++)
if(m_oChunkTranspose.getDimensionCount() == 0) {
this->getLogManager() << LogLevel_Debug << "Box received buffer before header. Bad.\n";
}
for(uint32 c=0;c<m_ui32NumberOfChannels;c++)
{
for(uint32 s=0;s<m_ui32NumberOfSamplesPerChunk;s++)
for(uint32 s=0;s<m_ui32NumberOfSamplesPerChunk;s++)
{
l_pDestination[s*m_ui32NumberOfChannels+c] = l_pSource[c*m_ui32NumberOfSamplesPerChunk+s];
}
......@@ -307,7 +340,7 @@ boolean CBoxAlgorithmTCPWriter::process(void)
}
else // stimulus
{
const IStimulationSet* l_pStimulations = m_StimulationDecoder.getOutputStimulationSet();
const IStimulationSet* l_pStimulations = m_oStimulationDecoder.getOutputStimulationSet();
for(uint32 j=0; j<l_pStimulations->getStimulationCount(); j++)
{
const uint64 l_ui64StimulationCode = l_pStimulations->getStimulationIdentifier(j);
......
......@@ -64,10 +64,10 @@ namespace OpenViBEPlugins
OpenViBE::boolean sendToClients(const void* pBuffer, OpenViBE::uint32 ui32BufferLength);
// Stream decoder
OpenViBEToolkit::TStimulationDecoder < CBoxAlgorithmTCPWriter > m_StimulationDecoder;
OpenViBEToolkit::TStreamedMatrixDecoder < CBoxAlgorithmTCPWriter > m_MatrixDecoder;
OpenViBEToolkit::TSignalDecoder < CBoxAlgorithmTCPWriter > m_SignalDecoder;
OpenViBEToolkit::TDecoder < CBoxAlgorithmTCPWriter > *m_pActiveDecoder;
OpenViBEToolkit::TStimulationDecoder < CBoxAlgorithmTCPWriter > m_oStimulationDecoder;
OpenViBEToolkit::TStreamedMatrixDecoder < CBoxAlgorithmTCPWriter > m_oMatrixDecoder;
OpenViBEToolkit::TSignalDecoder < CBoxAlgorithmTCPWriter > m_oSignalDecoder;
OpenViBEToolkit::TDecoder < CBoxAlgorithmTCPWriter >* m_pActiveDecoder;
boost::asio::io_service m_oIOService;
boost::asio::ip::tcp::acceptor* m_pAcceptor;
......@@ -146,7 +146,7 @@ namespace OpenViBEPlugins
virtual OpenViBE::CString getShortDescription(void) const { return OpenViBE::CString("Send input stream out via a TCP socket"); }
virtual OpenViBE::CString getDetailedDescription(void) const { return OpenViBE::CString("\n"); }
virtual OpenViBE::CString getCategory(void) const { return OpenViBE::CString("Acquisition and network IO"); }
virtual OpenViBE::CString getVersion(void) const { return OpenViBE::CString("0.1"); }
virtual OpenViBE::CString getVersion(void) const { return OpenViBE::CString("0.2"); }
virtual OpenViBE::CString getStockItemName(void) const { return OpenViBE::CString("gtk-connect"); }
virtual OpenViBE::CIdentifier getCreatedClass(void) const { return OVP_ClassId_BoxAlgorithm_TCPWriter; }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment