Commit 5f2658d2 authored by Jussi Lindgren's avatar Jussi Lindgren

Server: Fixes to TCP Tagging

- ovCTime steady clock is now used for the tagging with 32:32 fixed point time
- The tagging precision can now be sub-millisecond accuracy (not guaranteed)
- If clients provide a non-zero timestamp, this must now be in 32:32 fp as well
- The client provided stamps must be relative to the same point as the server code
  (todo: currently disabled until SDK has an appropriate time getter)
- New flags are provided to request either client or server side tagging. The
  Tag sent over the network will now contain these flags in the first uint64.
- The default is now to put the stamp on client side and adjust on server side.
- The port number is now parsed as an uint32 on the server side, not string
- Simplified the code as allowed by moving to the fixed point time
parent 5abeb810
......@@ -40,6 +40,11 @@ namespace OpenViBEAcquisitionServer
*
* This hook is called before sending the stimulations or signal to the connected clients.
* It gets a reference to the current signal buffer and the stimulation set with its start and end dates.
*
* Note that the given input buffer may have more samples than what should be processed
* per iteration. All operations on vPendingBuffer done in the hook should only consider
* the first sampleCountSentPerBlock samples. The later samples should be left as-is
* and will be provided on the next call.
*/
virtual void loopHook(std::deque < std::vector < OpenViBE::float32 > >& vPendingBuffer,
OpenViBE::CStimulationSet& oStimulationSet,
......
......@@ -813,6 +813,12 @@ void CAcquisitionServerGUI::buttonPreferencePressedCB(::GtkButton* pButton)
l_pSettingControl = gtk_entry_new();
gtk_entry_append_text(GTK_ENTRY(l_pSettingControl), r->getData()->toASCIIString());
}
else if( const TypedProperty<uint32>* r = dynamic_cast< const TypedProperty<uint32>* >(l_pCurrentProperty))
{
// cout << "uinteger\n";
l_pSettingControl = gtk_spin_button_new_with_range((gdouble)std::numeric_limits<uint32>::min(), (gdouble)std::numeric_limits<uint32>::max(), 1.0);
gtk_spin_button_set_value(GTK_SPIN_BUTTON(l_pSettingControl), (gdouble)*(r->getData()));
}
else if( const TypedProperty<int64>* r = dynamic_cast< const TypedProperty<int64>* >(l_pCurrentProperty))
{
// cout << "integer\n";
......
#include "ovasCPluginTCPTagging.h"
#include <sys/timeb.h>
#include <system/ovCTime.h>
#include "../ovasCSettingsHelper.h"
#include "../ovasCSettingsHelperOperators.h"
// #define TCPTAGGING_DEBUG
#if defined(TCPTAGGING_DEBUG)
#include <iomanip>
#endif
using namespace OpenViBE;
using namespace OpenViBE::Kernel;
using namespace OpenViBEAcquisitionServer;
......@@ -12,7 +18,7 @@ using namespace OpenViBEAcquisitionServerPlugins;
CPluginTCPTagging::CPluginTCPTagging(const OpenViBE::Kernel::IKernelContext& rKernelContext)
: IAcquisitionServerPlugin(rKernelContext, "AcquisitionServer_Plugin_TCPTagging"),
m_port("15361")
m_port(15361)
{
m_rKernelContext.getLogManager() << Kernel::LogLevel_Info << "Loading plugin: TCP Tagging\n";
m_oSettingsHelper.add("TCP_Tagging_Port", &m_port);
......@@ -26,31 +32,19 @@ CPluginTCPTagging::~CPluginTCPTagging()
void CPluginTCPTagging::startHook(const std::vector<OpenViBE::CString>& vSelectedChannelNames,
OpenViBE::uint32 ui32SamplingFrequency, OpenViBE::uint32 ui32ChannelCount, OpenViBE::uint32 ui32SampleCountPerSentBlock)
{
// get port from configuration
uint32 l_port=0;
for (unsigned i=0; i<m_port.length(); i++)
{
l_port*=10;
l_port+=m_port[i]-'0';
}
// initialize tag stream
// this may throw exceptions, e.g. when the port is already in use.
try {
m_scopedTagStream.reset(new CTagStream(l_port));
m_scopedTagStream.reset(new CTagStream(m_port));
}
catch (std::exception& e) {
m_rKernelContext.getLogManager() << Kernel::LogLevel_Error << "Could not create tag stream: " << e.what();
}
// Get POSIX time (number of milliseconds since epoch)
timeb time_buffer;
ftime(&time_buffer);
uint64 posixTime = time_buffer.time*1000ULL + time_buffer.millitm;
// Initialize time counters.
m_previousPosixTime = posixTime;
m_previousClockTime = System::Time::zgetTime();
m_previousSampleTime = 0;
m_bWarningPrinted = false;
}
void CPluginTCPTagging::stopHook()
......@@ -58,36 +52,48 @@ void CPluginTCPTagging::stopHook()
m_scopedTagStream.reset();
}
// n.b. With this version of tcp tagging, all the timestamps are in fixed point
void CPluginTCPTagging::loopHook(std::deque < std::vector < OpenViBE::float32 > >& /*vPendingBuffer*/,
OpenViBE::CStimulationSet& stimulationSet, uint64 start, uint64 end, uint64 sampleTime)
OpenViBE::CStimulationSet& stimulationSet, uint64 /* start */, uint64 /* end */, uint64 lastSampleTime)
{
// Get POSIX time (number of milliseconds since epoch)
timeb time_buffer;
ftime(&time_buffer);
uint64 posixTime = time_buffer.time*1000ULL + time_buffer.millitm;
const uint64 clockTime = System::Time::zgetTime();
Tag tag;
// Collect tags from the stream until exhaustion.
while(m_scopedTagStream.get() && m_scopedTagStream->pop(tag)) {
m_rKernelContext.getLogManager() << Kernel::LogLevel_Trace << "New Tag received (" << tag.padding << ", " << tag.identifier << ", " << tag.timestamp << ") at " << posixTime << " (posix time in ms)\n";
m_rKernelContext.getLogManager() << Kernel::LogLevel_Trace << "New Tag received (" << tag.flags << ", " << tag.identifier << ", "
<< ITimeArithmetics::timeToSeconds(tag.timestamp) << "s) at "
<< ITimeArithmetics::timeToSeconds(clockTime) << "s\n";
// Check that the timestamp fits the current chunk.
if (tag.timestamp < m_previousPosixTime) {
if (tag.timestamp < m_previousClockTime) {
m_rKernelContext.getLogManager() << Kernel::LogLevel_Trace << "The Tag has arrived before the beginning of the current chunk and will be inserted at the beginning of this chunk\n";
tag.timestamp = m_previousPosixTime;
tag.timestamp = m_previousClockTime;
}
// Marker time correction (simple local linear interpolation).
if (m_previousPosixTime != posixTime) {
tag.timestamp = m_previousSampleTime + (tag.timestamp - m_previousPosixTime)*((sampleTime - m_previousSampleTime) / (posixTime - m_previousPosixTime));
}
const uint64 tagOffsetClock = tag.timestamp - m_previousClockTime; // How far in time the marker is from the last call to this function
const uint64 adjustedTagTime = m_previousSampleTime + tagOffsetClock; // Time since the beginning of the current buffer (as approx by time of the last sample of the prev. run)
#if defined(TCPTAGGING_DEBUG)
std::cout << "Set tag " << tag.identifier
<< " at " << std::setprecision(6) << ITimeArithmetics::timeToSeconds(adjustedTagTime)
<< " (prevS = " << ITimeArithmetics::timeToSeconds(m_previousSampleTime) << "s, "
<< " prevC = " << ITimeArithmetics::timeToSeconds(m_previousClockTime) << "s, "
<< " tag = " << ITimeArithmetics::timeToSeconds(tag.timestamp) << "s, "
<< " d = " << ITimeArithmetics::timeToSeconds(tagOffsetClock)*1000.0 << "ms)"
<< "\n";
#endif
// Insert tag into the stimulation set.
stimulationSet.appendStimulation(tag.identifier, tag.timestamp, 0 /* duration of tag (ms) */);
stimulationSet.appendStimulation(tag.identifier, adjustedTagTime, 0 /* duration of tag (ms) */);
}
// Update time counters.
m_previousPosixTime = posixTime;
m_previousSampleTime = sampleTime;
// Update time counters. Basically these counters allow to map the time a stamp was received to the time related to the sample buffers,
// as we know this function is called right after receiving samples from a device.
m_previousClockTime = clockTime;
m_previousSampleTime = lastSampleTime;
}
......@@ -45,10 +45,13 @@ namespace OpenViBEAcquisitionServer
OpenViBE::CStimulationSet& stimulationSet, OpenViBE::uint64 start, OpenViBE::uint64 end, OpenViBE::uint64 sampleTime);
private:
OpenViBE::uint64 m_previousPosixTime;
OpenViBE::uint64 m_previousClockTime;
OpenViBE::uint64 m_previousSampleTime;
std::unique_ptr<CTagStream> m_scopedTagStream;
OpenViBE::CString m_port;
OpenViBE::uint32 m_port;
bool m_bWarningPrinted;
};
......
#include "ovasCTagStream.h"
#include <sys/timeb.h>
#include <system/ovCTime.h>
#include <iostream>
#include <thread>
#include <mutex>
......@@ -28,7 +30,7 @@ bool CTagQueue::pop(Tag &tag)
CTagSession::CTagSession(boost::asio::io_service &io_service, const SharedQueuePtr &queue)
: m_socket(io_service), m_queuePtr(queue)
: m_socket(io_service), m_queuePtr(queue), m_errorState(0)
{
}
......@@ -39,6 +41,7 @@ tcp::socket &CTagSession::socket()
void CTagSession::start()
{
m_errorState = 0;
startRead();
}
......@@ -50,16 +53,33 @@ void CTagSession::startRead()
void CTagSession::handleRead(const boost::system::error_code &error)
{
if (!error) {
// If the timestamp is 0, set timestamp to current posix time.
if (m_tag.timestamp==0) {
// Get POSIX time (number of milliseconds since epoch)
timeb time_buffer;
ftime(&time_buffer);
unsigned long long posixTime = time_buffer.time*1000ULL + time_buffer.millitm;
// edit timestamp
m_tag.timestamp=posixTime;
if (!error)
{
if(m_tag.timestamp == 0 || (m_tag.flags & TCP_Tagging_Flags::FLAG_AUTOSTAMP_SERVERSIDE))
{
// Client didn't provide timestamp or asked the server to do it. Stamp current time.
m_tag.timestamp = System::Time::zgetTime();
}
else if(!(m_tag.flags & TCP_Tagging_Flags::FLAG_FPTIME))
{
// Client provided stamp but not in FPTIME
m_tag.timestamp = System::Time::zgetTime();
if(!(m_errorState & (1LL<<1)))
{
// @fixme not appropriate to print errors from a thread, but better than silent fail
std::cout << "[WARNING] TCP Tagging: Received tag(s) not in fixed point time. Not supported, will replace with server time.\n";
m_errorState |= (1LL<<1);
}
}
else if(m_tag.flags & TCP_Tagging_Flags::FLAG_AUTOSTAMP_CLIENTSIDE)
{
// @FIXME remove this branch after zgetTimeRaw(false) becomes available in SDK; fix CStimulusSender.cpp and all refs to zgetTime() in tagging context to use the raw version.
m_tag.timestamp = System::Time::zgetTime();
if(!(m_errorState & (1LL<<2)))
{
std::cout << "[WARNING] TCP Tagging: Client side stamping not currently supported. Using server receiving time for tags.\n";
m_errorState |= (1LL<<2);
}
}
// Push tag to the queue.
......
......@@ -29,7 +29,14 @@ namespace OpenViBEAcquisitionServerPlugins
// and a timestamp corresponding to the time at which the event occurrs.
struct Tag
{
uint64_t padding, identifier, timestamp;
uint64_t flags, identifier, timestamp;
};
// Note: duplicated in TCP Tagging module in openvibe
enum TCP_Tagging_Flags {
FLAG_FPTIME = (1LL << 0), // The time given is fixed point time.
FLAG_AUTOSTAMP_CLIENTSIDE = (1LL << 1), // Ignore given stamp, bake timestamp on client side before sending
FLAG_AUTOSTAMP_SERVERSIDE = (1LL << 2) // Ignore given stamp, bake timestamp on server side when receiving
};
class CTagSession; // forward declaration of CTagSession to define SharedSessionPtr
......@@ -44,59 +51,60 @@ typedef std::unique_ptr<std::thread> ScopedThreadPtr;
// A trivial implementation of a queue to store Tags with exclusive locking
class CTagQueue
{
public:
CTagQueue()
{
}
public:
CTagQueue()
{
}
void push(const Tag& tag);
void push(const Tag& tag);
bool pop(Tag& tag);
private:
std::queue<Tag> m_queue;
std::mutex m_mutex;
bool pop(Tag& tag);
private:
std::queue<Tag> m_queue;
std::mutex m_mutex;
};
// An instance of CTagSession is associated to every client connecting to the Tagging Server.
// It contains a connection handle and data buffer.
class CTagSession : public std::enable_shared_from_this<CTagSession>
{
public:
CTagSession(boost::asio::io_service& io_service, const SharedQueuePtr& queue);
public:
CTagSession(boost::asio::io_service& io_service, const SharedQueuePtr& queue);
boost::asio::ip::tcp::socket& socket();
boost::asio::ip::tcp::socket& socket();
void start();
void start();
void startRead();
void startRead();
void handleRead(const boost::system::error_code& error);
void handleRead(const boost::system::error_code& error);
private:
Tag m_tag;
boost::asio::ip::tcp::socket m_socket;
SharedQueuePtr m_queuePtr;
};
private:
Tag m_tag;
boost::asio::ip::tcp::socket m_socket;
SharedQueuePtr m_queuePtr;
uint64_t m_errorState;
};
// CTagServer implements a server that binds to a port and accepts new connections.
class CTagServer
{
public:
CTagServer(const SharedQueuePtr& queue, int port = 15361);
~CTagServer();
public:
CTagServer(const SharedQueuePtr& queue, int port = 15361);
~CTagServer();
void run();
void stop();
void run();
void stop();
private:
void startAccept();
private:
void startAccept();
void handleAccept(SharedSessionPtr session, const boost::system::error_code& error);
void handleAccept(SharedSessionPtr session, const boost::system::error_code& error);
private:
boost::asio::io_service m_ioService;
boost::asio::ip::tcp::acceptor m_acceptor;
const SharedQueuePtr& m_queuePtr;
private:
boost::asio::io_service m_ioService;
boost::asio::ip::tcp::acceptor m_acceptor;
const SharedQueuePtr& m_queuePtr;
};
// CTagStream allows to collect tags received via TCP.
......@@ -105,20 +113,20 @@ class CTagStream
// Initial memory allocation of lockfree queue.
enum {ALLOCATE = 128};
public:
CTagStream(int port = 15361);
~CTagStream();
public:
CTagStream(int port = 15361);
~CTagStream();
bool pop(Tag& tag);
bool pop(Tag& tag);
private:
void startServer();
private:
void startServer();
private:
SharedQueuePtr m_queuePtr;
ScopedServerPtr m_serverPtr;
ScopedThreadPtr m_threadPtr;
int m_port;
private:
SharedQueuePtr m_queuePtr;
ScopedServerPtr m_serverPtr;
ScopedThreadPtr m_threadPtr;
int m_port;
};
}
......
......@@ -11,6 +11,7 @@ INCLUDE_DIRECTORIES(../)
ADD_EXECUTABLE(${PROJECT_NAME} test_tagstream.cpp ../ovasCTagStream.cpp)
INCLUDE("FindOpenViBE")
INCLUDE("FindOpenViBEModuleSystem") # Time getter from here in the future
INCLUDE("FindThirdPartyBoost")
INCLUDE("FindThirdPartyBoost_System")
INCLUDE("FindThirdPartyBoost_Thread")
......
......@@ -17,6 +17,7 @@ SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES
SET(INCLUDED_OV_SDK_COMPONENTS COMMON)
INCLUDE(AddOpenViBESDKComponents)
INCLUDE("FindOpenViBEModuleSystem")
INCLUDE("FindThirdPartyBoost")
INCLUDE("FindThirdPartyBoost_System") # ASIO
......
......@@ -25,10 +25,11 @@ namespace TCPTagging
virtual ~CStimulusSender();
// Connect to the TCP Tagging plugin of the Acquisition Server
virtual TCPTagging::boolean connect(const char* sAddress, const char* sStimulusPort);
bool connect(const char* sAddress, const char* sStimulusPort) override;
// Send a stimulation. Set Timestamp to 0 for immediate tagging (also the default).
virtual TCPTagging::boolean sendStimulation(TCPTagging::uint64 ui64Stimulation, TCPTagging::uint64 ui64Timestamp = 0) ;
// Send a stimulation.
bool sendStimulation(uint64_t ui64Stimulation, uint64_t ui64Timestamp = 0,
uint64_t ui64Flags = (FLAG_FPTIME | FLAG_AUTOSTAMP_CLIENTSIDE) ) override;
protected:
......
......@@ -14,14 +14,25 @@ namespace TCPTagging
public:
// Connect to the TCP Tagging plugin of the Acquisition Server
virtual TCPTagging::boolean connect(const char* sAddress, const char* sStimulusPort) = 0;
virtual bool connect(const char* sAddress, const char* sStimulusPort) = 0;
// Send a stimulation. Set Timestamp to 0 for immediate tagging (also the default).
virtual TCPTagging::boolean sendStimulation(TCPTagging::uint64 ui64Stimulation, TCPTagging::uint64 ui64Timestamp = 0) = 0;
// Send a stimulation.
// Or flags with FLAG_FPTIME if the provided time is fixed point.
// Or flags with FLAG_AUTOSTAMP_CLIENTSIDE to set the latest timestamp before sending. Then, ui64Timestamp is ignored.
// Or flags with FLAG_AUTOSTAMP_SERVERSIDE to request these server to stamp on receiveing. Then, ui64Timestamp is ignored.
virtual bool sendStimulation(uint64_t ui64Stimulation, uint64_t ui64Timestamp = 0,
uint64_t ui64Flags = (FLAG_FPTIME | FLAG_AUTOSTAMP_CLIENTSIDE)) = 0;
// To allow derived class' destructor to be called
virtual ~IStimulusSender(void) { }
// Note: duplicated in TCP Tagging plugin in AS
enum TCP_Tagging_Flags {
FLAG_FPTIME = (1LL << 0), // The time given is fixed point time.
FLAG_AUTOSTAMP_CLIENTSIDE = (1LL << 1), // Ignore given stamp, bake timestamp on client side before sending
FLAG_AUTOSTAMP_SERVERSIDE = (1LL << 2) // Ignore given stamp, bake timestamp on server side when receiving
};
};
// Clients are constructed via this call.
......
......@@ -5,6 +5,8 @@
#include <boost/array.hpp>
#include <sys/timeb.h>
#include <system/ovCTime.h>
using namespace TCPTagging;
using boost::asio::ip::tcp;
......@@ -49,7 +51,7 @@ boolean CStimulusSender::connect(const char* sAddress, const char* sStimulusPort
return true;
}
boolean CStimulusSender::sendStimulation(uint64 ui64Stimulation, uint64 ui64Timestamp /* = 0 */)
boolean CStimulusSender::sendStimulation(uint64_t ui64Stimulation, uint64_t ui64Timestamp /* = 0 */, uint64_t ui64Flags /* = FPTIME|CLIENTSIDE */)
{
if(!m_bConnectedOnce) {
return false;
......@@ -61,10 +63,16 @@ boolean CStimulusSender::sendStimulation(uint64 ui64Stimulation, uint64 ui64Time
return false;
}
uint64 l_ui64Zero = 0;
if(ui64Flags & IStimulusSender::TCP_Tagging_Flags::FLAG_AUTOSTAMP_CLIENTSIDE)
{
// @FIXME change to getter that has same starting point as the corresponding server getter
ui64Timestamp = System::Time::zgetTime();
ui64Flags |= IStimulusSender::TCP_Tagging_Flags::FLAG_FPTIME;
}
try
{
boost::asio::write(m_oStimulusSocket, boost::asio::buffer((void *)&l_ui64Zero, sizeof(uint64)));
boost::asio::write(m_oStimulusSocket, boost::asio::buffer((void *)&ui64Flags, sizeof(uint64)));
boost::asio::write(m_oStimulusSocket, boost::asio::buffer((void *)&ui64Stimulation, sizeof(uint64)));
boost::asio::write(m_oStimulusSocket, boost::asio::buffer((void *)&ui64Timestamp, sizeof(uint64)));
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment