Commit 56dd5e86 authored by nfoy's avatar nfoy
Browse files

implement tcp tagging

parent 7f30cc68
......@@ -43,7 +43,8 @@ namespace OpenViBEAcquisitionServer
virtual void loopHook(std::vector < std::vector < OpenViBE::float32 > >& vPendingBuffer,
OpenViBE::CStimulationSet& oStimulationSet,
OpenViBE::uint64 start,
OpenViBE::uint64 end) {}
OpenViBE::uint64 end,
OpenViBE::uint64 sampleTime) {}
/// Hook called at the end of the acceptNewConnection() function of AcquisitionServer
virtual void acceptNewConnectionHook() {}
......
......@@ -543,11 +543,12 @@ boolean CAcquisitionServer::loop(void)
const uint64 l_ui64BufferEndSamples = m_ui64SampleCount-m_vPendingBuffer.size()+m_ui32SampleCountPerSentBlock;
const uint64 l_ui64BufferStartTime = ITimeArithmetics::sampleCountToTime(m_ui32SamplingFrequency, l_ui64BufferStartSamples);
const uint64 l_ui64BufferEndTime = ITimeArithmetics::sampleCountToTime(m_ui32SamplingFrequency, l_ui64BufferEndSamples);
const uint64 l_ui64SampleTime = ITimeArithmetics::sampleCountToTime(m_ui32SamplingFrequency, m_ui64SampleCount);
// Pass the stimuli and buffer to all plugins; note that they may modify them
for(std::vector<IAcquisitionServerPlugin*>::iterator itp = m_vPlugins.begin(); itp != m_vPlugins.end(); ++itp)
{
(*itp)->loopHook(m_vPendingBuffer, m_oPendingStimulationSet, l_ui64BufferStartTime, l_ui64BufferEndTime);
(*itp)->loopHook(m_vPendingBuffer, m_oPendingStimulationSet, l_ui64BufferStartTime, l_ui64BufferEndTime, l_ui64SampleTime);
}
// Handle connections
......
......@@ -5,6 +5,10 @@ INCLUDE_DIRECTORIES(${ADDITIONAL_PATH})
FILE(GLOB_RECURSE additional_source_files ${ADDITIONAL_PATH}/*.cpp ${ADDITIONAL_PATH}/*.h)
SET(source_files "${source_files};${additional_source_files}")
SET(ADDITIONAL_PATH "${CMAKE_SOURCE_DIR}/contrib/plugins/server-extensions/tcp-tagging/")
INCLUDE_DIRECTORIES(${ADDITIONAL_PATH})
FILE(GLOB_RECURSE additional_source_files ${ADDITIONAL_PATH}/*.cpp ${ADDITIONAL_PATH}/*.h)
SET(source_files "${source_files};${additional_source_files}")
FUNCTION(OV_ADD_CONTRIB_DRIVER DRIVER_PATH)
......
......@@ -5,6 +5,7 @@
*/
#include "ovasCPluginExternalStimulations.h"
#include "ovasCPluginTCPTagging.h"
#include "ovasCDriverBrainmasterDiscovery.h"
#include "ovasCDriverBrainProductsBrainVisionRecorder.h"
......@@ -67,6 +68,9 @@ namespace OpenViBEContributions {
#endif
pGUI->registerPlugin(new OpenViBEAcquisitionServer::OpenViBEAcquisitionServerPlugins::CPluginExternalStimulations(rKernelContext));
// register tcp-tagging plugin
pGUI->registerPlugin(new OpenViBEAcquisitionServer::OpenViBEAcquisitionServerPlugins::CPluginTCPTagging(rKernelContext));
}
}
......@@ -61,7 +61,7 @@ void CPluginExternalStimulations::startHook(const std::vector<OpenViBE::CString>
}
void CPluginExternalStimulations::loopHook(std::vector < std::vector < OpenViBE::float32 > >& /* vPendingBuffer */, CStimulationSet &stimulationSet, uint64 start, uint64 end)
void CPluginExternalStimulations::loopHook(std::vector < std::vector < OpenViBE::float32 > >& /* vPendingBuffer */, CStimulationSet &stimulationSet, uint64 start, uint64 end, uint64 /* sampleTime */)
{
if (m_bIsExternalStimulationsEnabled)
{
......
......@@ -32,7 +32,7 @@ namespace OpenViBEAcquisitionServer
virtual void startHook(const std::vector<OpenViBE::CString>&, OpenViBE::uint32 ui32SamplingFrequency, OpenViBE::uint32 ui32ChannelCount, OpenViBE::uint32 ui32SampleCountPerSentBlock);
virtual void stopHook();
virtual void loopHook(std::vector < std::vector < OpenViBE::float32 > >& vPendingBuffer,
OpenViBE::CStimulationSet& stimulationSet, OpenViBE::uint64 start, OpenViBE::uint64 end);
OpenViBE::CStimulationSet& stimulationSet, OpenViBE::uint64 start, OpenViBE::uint64 end, OpenViBE::uint64 sampleTime);
virtual void acceptNewConnectionHook();
......
#include "ovasCPluginTCPTagging.h"
#include <set>
#include <unistd.h>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <boost/lockfree/queue.hpp>
#include <sys/timeb.h>
// Implementation of PluginTCPTagging.
// The plugin relies on three auxilliary classes: TagSession, TagServer and TagStream.
// TagServer implements a server that simply binds to a port and waits for incoming connections.
// TagSession represents an individual connection with a client and holds a connection handle (socket)
// and a data buffer to store incoming data.
// The use of shared pointers is instrumental to ensure that instances are still alive when call-backs are
// called and avoid memory corruption.
// The TagStream class implements a stream to allow to collect tags. Upon instantiation, it creates an instance
// of TagServer and starts the server in an auxilliary thread.
// The exchange of data between the main tread and the auxilliary thread is performed via a lockfree queue (boost).
using boost::asio::ip::tcp;
using namespace OpenViBE;
using namespace OpenViBEAcquisitionServer;
using namespace OpenViBEAcquisitionServerPlugins;
// A Tag consists of an identifier to inform about the type of event
// and a timestamp corresponding to the time at which the event occurrs.
struct Tag
{
uint64 padding, identifier, timestamp;
};
class TagSession; // forward declaration of Tagging Session to define SessionPtr
typedef boost::shared_ptr<boost::lockfree::queue<Tag> > QueuePtr;
typedef boost::shared_ptr<TagSession> SessionPtr;
// An instance of TagSession is associated to every client connecting to the Tagging Server.
// It contains a connection handle and data buffer.
class TagSession : public boost::enable_shared_from_this<TagSession>
{
public:
TagSession(boost::asio::io_service& io_service, std::set<SessionPtr>& sessionSet, const QueuePtr& queue)
: m_socket(io_service), m_sessionSet(sessionSet), m_queuePtr(queue)
{
}
tcp::socket& socket()
{
return m_socket;
}
void start()
{
m_sessionSet.insert(shared_from_this());
startRead();
}
void startRead()
{
// Caveat: a shared pointer is used (instead of simply using this) to ensure that this instance of TagSession is still alive when the call-back is called.
boost::asio::async_read(m_socket, boost::asio::buffer((void *) &m_tag, sizeof(Tag)), boost::bind(&TagSession::handleRead, shared_from_this(), _1));
}
void handleRead(const boost::system::error_code& error)
{
if (!error) {
m_queuePtr->push(m_tag);
// Continue reading.
startRead();
}
else {
// Eventually close the connection (once all shared references to this instance have been destroyed).
m_sessionSet.erase(shared_from_this());
}
}
private:
Tag m_tag;
tcp::socket m_socket;
std::set<SessionPtr>& m_sessionSet;
QueuePtr m_queuePtr;
};
// TagServer implements a server that binds to a port and accepts new connections.
class TagServer
{
public:
// Server port.
enum {PORT = 15361};
TagServer(const QueuePtr& queue)
: m_ioService(), m_acceptor(m_ioService, tcp::endpoint(tcp::v4(), PORT)), m_queuePtr(queue)
{
}
void run()
{
try {
startAccept();
m_ioService.run();
}
catch(std::exception& e) {
// TODO: log error message
}
}
private:
void startAccept()
{
SessionPtr newSession (new TagSession(m_ioService, m_sessionSet, m_queuePtr));
// Note: if this instance of TaggingSever is destroyed then the associated io_service is destroyed as well.
// Therefore the call-back will never be called if this instance is destroyed and it is safe to use this instead of shared pointer.
m_acceptor.async_accept(newSession->socket(), boost::bind(&TagServer::handleAccept, this, newSession, _1));
}
void handleAccept(SessionPtr session, const boost::system::error_code& error)
{
if (!error) {
session->start();
}
startAccept();
}
private:
boost::asio::io_service m_ioService;
tcp::acceptor m_acceptor;
std::set<SessionPtr> m_sessionSet;
const QueuePtr& m_queuePtr;
};
// TagStream allows to collect tags received via TCP.
class TagStream
{
// Initial memory allocation of lockfree queue.
enum {ALLOCATE = 128};
public:
TagStream()
: m_queuePtr(new boost::lockfree::queue<Tag>(ALLOCATE))
{
boost::thread thread (&TagStream::startServer, this);
}
bool pop(Tag& tag)
{
return m_queuePtr->pop(tag);
}
private:
void startServer()
{
TagServer server(m_queuePtr);
server.run();
}
private:
QueuePtr m_queuePtr;
};
static TagStream tagStream;
// CPluginTCPTagging implementation
CPluginTCPTagging::CPluginTCPTagging(const OpenViBE::Kernel::IKernelContext& rKernelContext)
: IAcquisitionServerPlugin(rKernelContext, CString("AcquisitionServer_Plugin_TCPTagging"))
{
m_rKernelContext.getLogManager() << Kernel::LogLevel_Info << "Loading plugin: TCP Tagging\n";
}
CPluginTCPTagging::~CPluginTCPTagging()
{
}
void CPluginTCPTagging::startHook(const std::vector<OpenViBE::CString>& vSelectedChannelNames,
OpenViBE::uint32 ui32SamplingFrequency, OpenViBE::uint32 ui32ChannelCount, OpenViBE::uint32 ui32SampleCountPerSentBlock)
{
// Get POSIX time (number of milliseconds since epoch)
timeb time_buffer;
ftime(&time_buffer);
uint64 posixTime = time_buffer.time*1000 + time_buffer.millitm;
// Initialize time counters.
m_previousPosixTime = posixTime;
m_previousSampleTime = 0;
// Clear Tag stream
Tag tag;
while(tagStream.pop(tag));
}
void CPluginTCPTagging::loopHook(std::vector < std::vector < OpenViBE::float32 > >& /*vPendingBuffer*/,
OpenViBE::CStimulationSet& stimulationSet, uint64 start, uint64 end, uint64 sampleTime)
{
// Get POSIX time (number of milliseconds since epoch)
timeb time_buffer;
ftime(&time_buffer);
uint64 posixTime = time_buffer.time*1000 + time_buffer.millitm;
Tag tag;
// Collect tags from the stream until exhaustion.
while(tagStream.pop(tag)) {
m_rKernelContext.getLogManager() << Kernel::LogLevel_Info << "New Tag received (" << tag.padding << ", " << tag.identifier << ", " << tag.timestamp << ")\n";
// Marker time correction (simple local linear interpolation).
if (m_previousPosixTime != posixTime)
tag.timestamp = m_previousSampleTime + (tag.timestamp - m_previousPosixTime)* (sampleTime - m_previousSampleTime) / (posixTime - m_previousPosixTime);
// Insert tag into the stimulation set.
stimulationSet.appendStimulation(tag.identifier, tag.timestamp, 40 /* duration of tag (ms) */);
}
// Update time counters.
m_previousPosixTime = posixTime;
m_previousSampleTime = sampleTime;
}
#ifndef __OpenViBE_AcquisitionServer_TCPTagging_H__
#define __OpenViBE_AcquisitionServer_TCPTagging_H__
/**
* \brief Acquisition Server plugin adding the capability to receive stimulations from external sources
* via TCP.
*/
#include "ovasIAcquisitionServerPlugin.h"
namespace OpenViBEAcquisitionServer
{
namespace OpenViBEAcquisitionServerPlugins
{
class CPluginTCPTagging : public IAcquisitionServerPlugin
{
public:
CPluginTCPTagging(const OpenViBE::Kernel::IKernelContext& rKernelContext);
~CPluginTCPTagging();
// Overrides virtual method startHook inherited from class IAcquisitionServerPlugin.
void startHook(const std::vector<OpenViBE::CString>& vSelectedChannelNames, OpenViBE::uint32 ui32SamplingFrequency,
OpenViBE::uint32 ui32ChannelCount, OpenViBE::uint32 ui32SampleCountPerSentBlock);
// Overrides virtual method loopHook inherited from class IAcquisitionServerPlugin.
void loopHook(std::vector < std::vector < OpenViBE::float32 > >& vPendingBuffer,
OpenViBE::CStimulationSet& stimulationSet, OpenViBE::uint64 start, OpenViBE::uint64 end, OpenViBE::uint64 sampleTime);
private:
OpenViBE::uint64 m_previousPosixTime;
OpenViBE::uint64 m_previousSampleTime;
};
}
}
#endif // __OpenViBE_AcquisitionServer_TCPTagging_H__
......@@ -114,7 +114,7 @@ void CPluginLSLOutput::startHook(const std::vector<OpenViBE::CString>& vSelected
}
void CPluginLSLOutput::loopHook(std::vector < std::vector < OpenViBE::float32 > >& vPendingBuffer, CStimulationSet &stimulationSet, uint64 start, uint64 end)
void CPluginLSLOutput::loopHook(std::vector < std::vector < OpenViBE::float32 > >& vPendingBuffer, CStimulationSet &stimulationSet, uint64 start, uint64 end, uint64 /* sampleTime */)
{
if (m_bIsLSLOutputEnabled)
{
......@@ -172,4 +172,4 @@ void CPluginLSLOutput::stopHook()
}
#endif
\ No newline at end of file
#endif
......@@ -30,7 +30,7 @@ namespace OpenViBEAcquisitionServer
virtual void startHook(const std::vector<OpenViBE::CString>& vSelectedChannelNames, OpenViBE::uint32 ui32SamplingFrequency, OpenViBE::uint32 ui32ChannelCount, OpenViBE::uint32 ui32SampleCountPerSentBlock);
virtual void stopHook();
virtual void loopHook(std::vector < std::vector < OpenViBE::float32 > >& vPendingBuffer,
OpenViBE::CStimulationSet& stimulationSet, OpenViBE::uint64 start, OpenViBE::uint64 end);
OpenViBE::CStimulationSet& stimulationSet, OpenViBE::uint64 start, OpenViBE::uint64 end, OpenViBE::uint64 sampleTime);
// Plugin implementation
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment