Commit 5977b832 authored by nfoy's avatar nfoy
Browse files

port TCP tagging plugin to Windows 7

parent 805a179b
......@@ -32,11 +32,6 @@ for i in range(100):
event_id=list(to_byte(EVENT_ID, 8))
timestamp=list(to_byte(int(time()*1000)+DELAY, 8))
# transform from little endian to big endian
padding.reverse()
event_id.reverse()
timestamp.reverse()
# send tag and sleep
s.sendall(bytes(padding+event_id+timestamp))
sleep(1)
......
#include "ovasCPluginTCPTagging.h"
#include <set>
#include <unistd.h>
#include <queue>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <boost/lockfree/queue.hpp>
#include <sys/timeb.h>
#if defined TARGET_OS_Linux
#include <endian.h>
#elif defined TARGET_OS_Windows
#include <winsock2.h>
#define be64toh(x) ntohll(x)
#endif
// Implementation of PluginTCPTagging.
// The plugin relies on three auxilliary classes: TagSession, TagServer and TagStream.
// The plugin relies on four auxilliary classes: TagQueue, TagSession, TagServer and TagStream.
// TagQueue implements a trivial queue to store tags with exclusive locking.
// TagServer implements a server that simply binds to a port and waits for incoming connections.
// TagSession represents an individual connection with a client and holds a connection handle (socket)
// and a data buffer to store incoming data.
......@@ -40,17 +33,47 @@ struct Tag
uint64 padding, identifier, timestamp;
};
class TagSession; // forward declaration of Tagging Session to define SharedSessionPtr
class TagSession; // forward declaration of TagSession to define SharedSessionPtr
class TagQueue; // forward declaration of TagQueue to define SharedQueuePtr
typedef boost::shared_ptr<boost::lockfree::queue<Tag> > SharedQueuePtr;
typedef boost::shared_ptr<TagQueue> SharedQueuePtr;
typedef boost::shared_ptr<TagSession> SharedSessionPtr;
// A trivial implementation of a queue to store Tags with exclusive locking
class TagQueue
{
public:
TagQueue()
{
}
void push(const Tag& tag)
{
boost::lock_guard<boost::mutex> guard(m_mutex);
m_queue.push(tag);
}
bool pop(Tag& tag)
{
boost::lock_guard<boost::mutex> guard(m_mutex);
if (m_queue.size()==0) return false;
else {
tag = m_queue.front();
m_queue.pop();
return true;
}
}
private:
std::queue<Tag> m_queue;
boost::mutex m_mutex;
};
// An instance of TagSession is associated to every client connecting to the Tagging Server.
// It contains a connection handle and data buffer.
class TagSession : public boost::enable_shared_from_this<TagSession>
{
public:
TagSession(boost::asio::io_service& io_service, const SharedQueuePtr& queue)
TagSession(boost::asio::io_service& io_service, const SharedQueuePtr& queue)
: m_socket(io_service), m_queuePtr(queue)
{
}
......@@ -74,16 +97,11 @@ public:
void handleRead(const boost::system::error_code& error)
{
if (!error) {
// Correct endianness.
m_tag.padding = be64toh(m_tag.padding);
m_tag.identifier = be64toh(m_tag.identifier);
m_tag.timestamp = be64toh(m_tag.timestamp);
// Push tag to the queue.
m_queuePtr->push(m_tag);
// Continue reading.
startRead();
startRead();
}
}
......@@ -113,16 +131,16 @@ public:
m_ioService.run();
}
catch(std::exception& e) {
// TODO: log error message
}
// TODO: log error message
}
}
private:
void startAccept()
{
SharedSessionPtr newSession (new TagSession(m_ioService, m_queuePtr));
SharedSessionPtr newSession (new TagSession(m_ioService, m_queuePtr));
// Note: if this instance of TaggingSever is destroyed then the associated io_service is destroyed as well.
// Therefore the call-back will never be called if this instance is destroyed and it is safe to use this instead of shared pointer.
// Therefore the call-back will never be called if this instance is destroyed and it is safe to use this instead of shared pointer.
m_acceptor.async_accept(newSession->socket(), boost::bind(&TagServer::handleAccept, this, newSession, _1));
}
......@@ -147,12 +165,12 @@ class TagStream
// Initial memory allocation of lockfree queue.
enum {ALLOCATE = 128};
public:
public:
TagStream()
: m_queuePtr(new boost::lockfree::queue<Tag>(ALLOCATE))
: m_queuePtr(new TagQueue)
{
boost::thread thread (&TagStream::startServer, this);
}
}
bool pop(Tag& tag)
{
......@@ -164,8 +182,8 @@ private:
{
TagServer server(m_queuePtr);
server.run();
}
}
private:
SharedQueuePtr m_queuePtr;
};
......@@ -184,13 +202,13 @@ CPluginTCPTagging::~CPluginTCPTagging()
{
}
void CPluginTCPTagging::startHook(const std::vector<OpenViBE::CString>& vSelectedChannelNames,
void CPluginTCPTagging::startHook(const std::vector<OpenViBE::CString>& vSelectedChannelNames,
OpenViBE::uint32 ui32SamplingFrequency, OpenViBE::uint32 ui32ChannelCount, OpenViBE::uint32 ui32SampleCountPerSentBlock)
{
// Get POSIX time (number of milliseconds since epoch)
timeb time_buffer;
ftime(&time_buffer);
uint64 posixTime = time_buffer.time*1000ULL + time_buffer.millitm;
timeb time_buffer;
ftime(&time_buffer);
uint64 posixTime = time_buffer.time*1000ULL + time_buffer.millitm;
// Initialize time counters.
m_previousPosixTime = posixTime;
......@@ -201,22 +219,22 @@ void CPluginTCPTagging::startHook(const std::vector<OpenViBE::CString>& vSelecte
while(tagStream.pop(tag));
}
void CPluginTCPTagging::loopHook(std::vector < std::vector < OpenViBE::float32 > >& /*vPendingBuffer*/,
void CPluginTCPTagging::loopHook(std::vector < std::vector < OpenViBE::float32 > >& /*vPendingBuffer*/,
OpenViBE::CStimulationSet& stimulationSet, uint64 start, uint64 end, uint64 sampleTime)
{
// Get POSIX time (number of milliseconds since epoch)
timeb time_buffer;
ftime(&time_buffer);
uint64 posixTime = time_buffer.time*1000ULL + time_buffer.millitm;
timeb time_buffer;
ftime(&time_buffer);
uint64 posixTime = time_buffer.time*1000ULL + time_buffer.millitm;
Tag tag;
Tag tag;
// Collect tags from the stream until exhaustion.
while(tagStream.pop(tag)) {
m_rKernelContext.getLogManager() << Kernel::LogLevel_Info << "New Tag received (" << tag.padding << ", " << tag.identifier << ", " << tag.timestamp << ")\n";
// Add 10 ms delay to reduce the risk of a race condition.
// The duration (dt) between the moment when the tag is timestamped and the moment when it is received by the acquisition server is generally small.
// Add 10 ms delay to reduce the risk of a race condition.
// The duration (dt) between the moment when the tag is timestamped and the moment when it is received by the acquisition server is generally small.
// Typically dt<1ms for localhost. Nonetheless it may still happen that timestamp + dt > m_previousPosixTime.
// This would result in a tag loss. The delay allows to compensate for dt. Its value is empirical and should be >= max(dt).
tag.timestamp += 10;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment