Commit 5808e2c6 authored by nfoy's avatar nfoy
Browse files

add client example

change Queue/SessionPtr to SharedQueue/SessionPtr
improve explanation for the 10ms delay
handle endianness issues
parent 7746186f
# Example of tcp tagging client
import sys
import socket
from time import time, sleep
# host and port of tcp tagging server
HOST = ''
PORT = 15361
# Event identifier
EVENT_ID = 5+0x8100
# Artificial delay (ms). It may need to be increased if the time to send the tag is too long and causes tag loss.
# transform a value into an array of byte values in little-endian order.
def to_byte(value, length):
for x in range(length):
yield value%256
# connect
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
for i in range(100):
# create the three pieces of the tag, padding, event_id and timestamp
event_id=list(to_byte(EVENT_ID, 8))
timestamp=list(to_byte(int(time()*1000)+DELAY, 8))
# transform from little endian to big endian
# send tag and sleep
......@@ -10,6 +10,13 @@
#include <boost/lockfree/queue.hpp>
#include <sys/timeb.h>
#if defined TARGET_OS_Linux
#include <endian.h>
#elif defined TARGET_OS_Windows
#include <winsock2.h>
#define be64toh(x) htonll(x)
// Implementation of PluginTCPTagging.
// The plugin relies on three auxilliary classes: TagSession, TagServer and TagStream.
// TagServer implements a server that simply binds to a port and waits for incoming connections.
......@@ -33,17 +40,17 @@ struct Tag
uint64 padding, identifier, timestamp;
class TagSession; // forward declaration of Tagging Session to define SessionPtr
class TagSession; // forward declaration of Tagging Session to define SharedSessionPtr
typedef boost::shared_ptr<boost::lockfree::queue<Tag> > QueuePtr;
typedef boost::shared_ptr<TagSession> SessionPtr;
typedef boost::shared_ptr<boost::lockfree::queue<Tag> > SharedQueuePtr;
typedef boost::shared_ptr<TagSession> SharedSessionPtr;
// An instance of TagSession is associated to every client connecting to the Tagging Server.
// It contains a connection handle and data buffer.
class TagSession : public boost::enable_shared_from_this<TagSession>
TagSession(boost::asio::io_service& io_service, std::set<SessionPtr>& sessionSet, const QueuePtr& queue)
TagSession(boost::asio::io_service& io_service, std::set<SharedSessionPtr>& sessionSet, const SharedQueuePtr& queue)
: m_socket(io_service), m_sessionSet(sessionSet), m_queuePtr(queue)
......@@ -68,6 +75,11 @@ public:
void handleRead(const boost::system::error_code& error)
if (!error) {
// Correct endianness
m_tag.padding = be64toh(m_tag.padding);
m_tag.identifier = be64toh(m_tag.identifier);
m_tag.timestamp = be64toh(m_tag.timestamp);
// Continue reading.
......@@ -81,18 +93,19 @@ public:
Tag m_tag;
tcp::socket m_socket;
std::set<SessionPtr>& m_sessionSet;
QueuePtr m_queuePtr;
std::set<SharedSessionPtr>& m_sessionSet;
SharedQueuePtr m_queuePtr;
// TagServer implements a server that binds to a port and accepts new connections.
// It also has a field sessionSet that holds shared pointers to all exisiting sessions
class TagServer
// Server port.
enum {PORT = 15361};
TagServer(const QueuePtr& queue)
TagServer(const SharedQueuePtr& queue)
: m_ioService(), m_acceptor(m_ioService, tcp::endpoint(tcp::v4(), PORT)), m_queuePtr(queue)
......@@ -111,13 +124,13 @@ public:
void startAccept()
SessionPtr newSession (new TagSession(m_ioService, m_sessionSet, m_queuePtr));
SharedSessionPtr newSession (new TagSession(m_ioService, m_sessionSet, m_queuePtr));
// Note: if this instance of TaggingSever is destroyed then the associated io_service is destroyed as well.
// Therefore the call-back will never be called if this instance is destroyed and it is safe to use this instead of shared pointer.
m_acceptor.async_accept(newSession->socket(), boost::bind(&TagServer::handleAccept, this, newSession, _1));
void handleAccept(SessionPtr session, const boost::system::error_code& error)
void handleAccept(SharedSessionPtr session, const boost::system::error_code& error)
if (!error) {
......@@ -129,8 +142,8 @@ private:
boost::asio::io_service m_ioService;
tcp::acceptor m_acceptor;
std::set<SessionPtr> m_sessionSet;
const QueuePtr& m_queuePtr;
std::set<SharedSessionPtr> m_sessionSet;
const SharedQueuePtr& m_queuePtr;
// TagStream allows to collect tags received via TCP.
......@@ -159,7 +172,7 @@ private:
QueuePtr m_queuePtr;
SharedQueuePtr m_queuePtr;
static TagStream tagStream;
......@@ -207,10 +220,13 @@ void CPluginTCPTagging::loopHook(std::vector < std::vector < OpenViBE::float32 >
while(tagStream.pop(tag)) {
m_rKernelContext.getLogManager() << Kernel::LogLevel_Info << "New Tag received (" << tag.padding << ", " << tag.identifier << ", " << tag.timestamp << ")\n";
// add 10 ms delay to ensure that the timestamp fits in the time frame when it arrives just before updating the time counters
// Add 10 ms delay to reduce the risk of a race condition.
// The duration (dt) between the moment when the tag is timestamped and the moment when it is received by the acquisition server is generally small.
// Typically dt<1ms for localhost. Nonetheless it may still happen that timestamp + dt > m_previousPosixTime.
// This would result in a tag loss. The delay allows to compensate for dt. Its value is empirical and should be >= max(dt).
tag.timestamp += 10;
// check that the timestamp fits in the time frame
// Check that the timestamp fits in the time frame.
if (tag.timestamp < m_previousPosixTime) {
m_rKernelContext.getLogManager() << Kernel::LogLevel_Error << "The Tag is discarded because it arrives too late to be inserted in the current signal block\n";
......@@ -222,7 +238,7 @@ void CPluginTCPTagging::loopHook(std::vector < std::vector < OpenViBE::float32 >
// Insert tag into the stimulation set.
stimulationSet.appendStimulation(tag.identifier, tag.timestamp, 40 /* duration of tag (ms) */);
stimulationSet.appendStimulation(tag.identifier, tag.timestamp, 0 /* duration of tag (ms) */);
// Update time counters.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment