Commit 0ac8d1df authored by nfoy's avatar nfoy

fix TCP Tagging

parent d7ea3519
......@@ -25,8 +25,22 @@ CPluginTCPTagging::~CPluginTCPTagging()
void CPluginTCPTagging::startHook(const std::vector<OpenViBE::CString>& vSelectedChannelNames,
OpenViBE::uint32 ui32SamplingFrequency, OpenViBE::uint32 ui32ChannelCount, OpenViBE::uint32 ui32SampleCountPerSentBlock)
{
// get port from configuration
uint32 l_port=0;
for (unsigned i=0; i<m_port.length(); i++)
{
l_port*=10;
l_port+=m_port[i]-'0';
}
// initialize tag stream
m_scopedTagStream.reset(new CTagStream());
// this may throw exceptions, e.g. when the port is already in use.
try {
m_scopedTagStream.reset(new CTagStream(l_port));
}
catch (std::exception& e) {
m_rKernelContext.getLogManager() << Kernel::LogLevel_Error << "Could not create tag stream: " << e.what();
}
// Get POSIX time (number of milliseconds since epoch)
timeb time_buffer;
......@@ -54,7 +68,7 @@ void CPluginTCPTagging::loopHook(std::vector < std::vector < OpenViBE::float32 >
Tag tag;
// Collect tags from the stream until exhaustion.
while(m_scopedTagStream->pop(tag)) {
while(m_scopedTagStream.get() && m_scopedTagStream->pop(tag)) {
m_rKernelContext.getLogManager() << Kernel::LogLevel_Info << "New Tag received (" << tag.padding << ", " << tag.identifier << ", " << tag.timestamp << ") at " << posixTime << " (posix time in ms)\n";
// Check that the timestamp fits the current chunk.
......
......@@ -81,16 +81,24 @@ void CTagServer::run()
m_ioService.run();
}
catch(std::exception& e) {
// TODO: log error message
// TODO: log error message (needs to be thread-safe)
}
}
CTagServer::~CTagServer()
{
}
void CTagServer::stop()
{
m_ioService.stop();
}
void CTagServer::startAccept()
{
SharedSessionPtr newSession (new CTagSession(m_ioService, m_queuePtr));
// Note: if this instance of TaggingSever is destroyed then the associated io_service is destroyed as well.
// Therefore the call-back will never be called if this instance is destroyed and it is safe to use this instead of shared pointer.
// Note: if this instance of CTagSever is destroyed then the associated io_service is destroyed as well.
// Therefore the call-back will never be called if this instance is destroyed and it is safe to use this instead of a shared pointer.
m_acceptor.async_accept(newSession->socket(), boost::bind(&CTagServer::handleAccept, this, newSession, _1));
}
......@@ -108,7 +116,9 @@ void CTagServer::handleAccept(SharedSessionPtr session, const boost::system::err
CTagStream::CTagStream(int port)
: m_queuePtr(new CTagQueue), m_port(port)
{
boost::thread thread (&CTagStream::startServer, this);
// can throw exceptions, e.g. when the port is already in use.
m_serverPtr.reset(new CTagServer(m_queuePtr, m_port));
m_threadPtr.reset(new boost::thread(&CTagStream::startServer, this));
}
......@@ -120,6 +130,12 @@ bool CTagStream::pop(Tag &tag)
void CTagStream::startServer()
{
CTagServer server(m_queuePtr, m_port);
server.run();
m_serverPtr->run();
}
CTagStream::~CTagStream()
{
// m_serverPtr and m_threadPtr cannot be null
m_serverPtr->stop();
m_threadPtr->join();
}
#ifndef __OpenViBE_AcquisitionServer_TCPTagSession_H__
#define __OpenViBE_AcquisitionServer_TCPTagSession_H__
#include <set>
#include <queue>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
// PluginTCPTagging relies on four auxilliary classes: TagQueue, TagSession, TagServer and TagStream.
// TagQueue implements a trivial queue to store tags with exclusive locking.
// TagServer implements a server that simply binds to a port and waits for incoming connections.
// TagSession represents an individual connection with a client and holds a connection handle (socket)
// PluginTCPTagging relies on four auxilliary classes: CTagQueue, CTagSession, CTagServer and CTagStream.
// CTagQueue implements a trivial queue to store tags with exclusive locking.
// CTagServer implements a server that simply binds to a port and waits for incoming connections.
// CTagSession represents an individual connection with a client and holds a connection handle (socket)
// and a data buffer to store incoming data.
// The use of shared pointers is instrumental to ensure that instances are still alive when call-backs are
// called and avoid memory corruption.
// The TagStream class implements a stream to allow to collect tags. Upon instantiation, it creates an instance
// of TagServer and starts the server in an auxilliary thread.
// The CTagStream class implements a stream to allow to collect tags. Upon instantiation, it creates an instance
// of CTagServer and starts the server in an auxilliary thread.
// The exchange of data between the main tread and the auxilliary thread is performed via a lockfree queue (boost).
namespace OpenViBEAcquisitionServer
......@@ -33,11 +33,14 @@ struct Tag
unsigned long long padding, identifier, timestamp;
};
class CTagSession; // forward declaration of TagSession to define SharedSessionPtr
class CTagQueue; // forward declaration of TagQueue to define SharedQueuePtr
class CTagSession; // forward declaration of CTagSession to define SharedSessionPtr
class CTagQueue; // forward declaration of CTagQueue to define SharedQueuePtr
class CTagServer; // forward declaration of CTagServer to define ScopedServerPtr
typedef boost::shared_ptr<CTagQueue> SharedQueuePtr;
typedef boost::shared_ptr<CTagSession> SharedSessionPtr;
typedef boost::scoped_ptr<CTagServer> ScopedServerPtr;
typedef boost::scoped_ptr<boost::thread> ScopedThreadPtr;
// A trivial implementation of a queue to store Tags with exclusive locking
class CTagQueue
......@@ -55,7 +58,7 @@ private:
boost::mutex m_mutex;
};
// An instance of TagSession is associated to every client connecting to the Tagging Server.
// An instance of CTagSession is associated to every client connecting to the Tagging Server.
// It contains a connection handle and data buffer.
class CTagSession : public boost::enable_shared_from_this<CTagSession>
{
......@@ -76,14 +79,15 @@ private:
SharedQueuePtr m_queuePtr;
};
// TagServer implements a server that binds to a port and accepts new connections.
// It also has a field sessionSet that holds shared pointers to all exisiting sessions
// CTagServer implements a server that binds to a port and accepts new connections.
class CTagServer
{
public:
CTagServer(const SharedQueuePtr& queue, int port = 15361);
~CTagServer();
void run();
void stop();
private:
void startAccept();
......@@ -96,7 +100,7 @@ private:
const SharedQueuePtr& m_queuePtr;
};
// TagStream allows to collect tags received via TCP.
// CTagStream allows to collect tags received via TCP.
class CTagStream
{
// Initial memory allocation of lockfree queue.
......@@ -104,6 +108,7 @@ class CTagStream
public:
CTagStream(int port = 15361);
~CTagStream();
bool pop(Tag& tag);
......@@ -112,6 +117,8 @@ private:
private:
SharedQueuePtr m_queuePtr;
ScopedServerPtr m_serverPtr;
ScopedThreadPtr m_threadPtr;
int m_port;
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment