Commit d7ea3519 authored by nfoy's avatar nfoy

add ovasCTagStream files

parent 9477ef74
#include "ovasCTagStream.h"
#include <sys/timeb.h>
using namespace boost::asio::ip;
using namespace OpenViBEAcquisitionServer;
using namespace OpenViBEAcquisitionServerPlugins;
void CTagQueue::push(const Tag &tag)
{
boost::lock_guard<boost::mutex> guard(m_mutex);
m_queue.push(tag);
}
bool CTagQueue::pop(Tag &tag)
{
boost::lock_guard<boost::mutex> guard(m_mutex);
if (m_queue.size()==0) return false;
else {
tag = m_queue.front();
m_queue.pop();
return true;
}
}
CTagSession::CTagSession(boost::asio::io_service &io_service, const SharedQueuePtr &queue)
: m_socket(io_service), m_queuePtr(queue)
{
}
tcp::socket &CTagSession::socket()
{
return m_socket;
}
void CTagSession::start()
{
startRead();
}
void CTagSession::startRead()
{
// Caveat: a shared pointer is used (instead of simply using this) to ensure that this instance of TagSession is still alive when the callback is called.
boost::asio::async_read(m_socket, boost::asio::buffer((void *) &m_tag, sizeof(Tag)), boost::bind(&CTagSession::handleRead, shared_from_this(), _1));
}
void CTagSession::handleRead(const boost::system::error_code &error)
{
if (!error) {
// If the timestamp is 0, set timestamp to current posix time.
if (m_tag.timestamp==0) {
// Get POSIX time (number of milliseconds since epoch)
timeb time_buffer;
ftime(&time_buffer);
unsigned long long posixTime = time_buffer.time*1000ULL + time_buffer.millitm;
// edit timestamp
m_tag.timestamp=posixTime;
}
// Push tag to the queue.
m_queuePtr->push(m_tag);
// Continue reading.
startRead();
}
}
CTagServer::CTagServer(const SharedQueuePtr &queue, int port)
: m_ioService(), m_acceptor(m_ioService, tcp::endpoint(tcp::v4(), port)), m_queuePtr(queue)
{
}
void CTagServer::run()
{
try {
startAccept();
m_ioService.run();
}
catch(std::exception& e) {
// TODO: log error message
}
}
void CTagServer::startAccept()
{
SharedSessionPtr newSession (new CTagSession(m_ioService, m_queuePtr));
// Note: if this instance of TaggingSever is destroyed then the associated io_service is destroyed as well.
// Therefore the call-back will never be called if this instance is destroyed and it is safe to use this instead of shared pointer.
m_acceptor.async_accept(newSession->socket(), boost::bind(&CTagServer::handleAccept, this, newSession, _1));
}
void CTagServer::handleAccept(SharedSessionPtr session, const boost::system::error_code &error)
{
if (!error) {
session->start();
}
startAccept();
}
CTagStream::CTagStream(int port)
: m_queuePtr(new CTagQueue), m_port(port)
{
boost::thread thread (&CTagStream::startServer, this);
}
bool CTagStream::pop(Tag &tag)
{
return m_queuePtr->pop(tag);
}
void CTagStream::startServer()
{
CTagServer server(m_queuePtr, m_port);
server.run();
}
#ifndef __OpenViBE_AcquisitionServer_TCPTagSession_H__
#define __OpenViBE_AcquisitionServer_TCPTagSession_H__
#include <set>
#include <queue>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
// PluginTCPTagging relies on four auxilliary classes: TagQueue, TagSession, TagServer and TagStream.
// TagQueue implements a trivial queue to store tags with exclusive locking.
// TagServer implements a server that simply binds to a port and waits for incoming connections.
// TagSession represents an individual connection with a client and holds a connection handle (socket)
// and a data buffer to store incoming data.
// The use of shared pointers is instrumental to ensure that instances are still alive when call-backs are
// called and avoid memory corruption.
// The TagStream class implements a stream to allow to collect tags. Upon instantiation, it creates an instance
// of TagServer and starts the server in an auxilliary thread.
// The exchange of data between the main tread and the auxilliary thread is performed via a lockfree queue (boost).
namespace OpenViBEAcquisitionServer
{
namespace OpenViBEAcquisitionServerPlugins
{
// A Tag consists of an identifier to inform about the type of event
// and a timestamp corresponding to the time at which the event occurrs.
struct Tag
{
unsigned long long padding, identifier, timestamp;
};
class CTagSession; // forward declaration of TagSession to define SharedSessionPtr
class CTagQueue; // forward declaration of TagQueue to define SharedQueuePtr
typedef boost::shared_ptr<CTagQueue> SharedQueuePtr;
typedef boost::shared_ptr<CTagSession> SharedSessionPtr;
// A trivial implementation of a queue to store Tags with exclusive locking
class CTagQueue
{
public:
CTagQueue()
{
}
void push(const Tag& tag);
bool pop(Tag& tag);
private:
std::queue<Tag> m_queue;
boost::mutex m_mutex;
};
// An instance of TagSession is associated to every client connecting to the Tagging Server.
// It contains a connection handle and data buffer.
class CTagSession : public boost::enable_shared_from_this<CTagSession>
{
public:
CTagSession(boost::asio::io_service& io_service, const SharedQueuePtr& queue);
boost::asio::ip::tcp::socket& socket();
void start();
void startRead();
void handleRead(const boost::system::error_code& error);
private:
Tag m_tag;
boost::asio::ip::tcp::socket m_socket;
SharedQueuePtr m_queuePtr;
};
// TagServer implements a server that binds to a port and accepts new connections.
// It also has a field sessionSet that holds shared pointers to all exisiting sessions
class CTagServer
{
public:
CTagServer(const SharedQueuePtr& queue, int port = 15361);
void run();
private:
void startAccept();
void handleAccept(SharedSessionPtr session, const boost::system::error_code& error);
private:
boost::asio::io_service m_ioService;
boost::asio::ip::tcp::acceptor m_acceptor;
const SharedQueuePtr& m_queuePtr;
};
// TagStream allows to collect tags received via TCP.
class CTagStream
{
// Initial memory allocation of lockfree queue.
enum {ALLOCATE = 128};
public:
CTagStream(int port = 15361);
bool pop(Tag& tag);
private:
void startServer();
private:
SharedQueuePtr m_queuePtr;
int m_port;
};
}
}
#endif // __OpenViBE_AcquisitionServer_TCPTagSession_H__
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment