ovasCPluginTCPTagging.cpp 7.57 KB
Newer Older
nfoy's avatar
nfoy committed
1
2
3
#include "ovasCPluginTCPTagging.h"

#include <set>
nfoy's avatar
nfoy committed
4
#include <queue>
nfoy's avatar
nfoy committed
5
6
7
8
9
10
11
12
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <sys/timeb.h>

// Implementation of PluginTCPTagging.
nfoy's avatar
nfoy committed
13
14
// The plugin relies on four auxilliary classes: TagQueue, TagSession, TagServer and TagStream.
// TagQueue implements a trivial queue to store tags with exclusive locking.
nfoy's avatar
nfoy committed
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// TagServer implements a server that simply binds to a port and waits for incoming connections.
// TagSession represents an individual connection with a client and holds a connection handle (socket)
// and a data buffer to store incoming data.
// The use of shared pointers is instrumental to ensure that instances are still alive when call-backs are
// called and avoid memory corruption.
// The TagStream class implements a stream to allow to collect tags. Upon instantiation, it creates an instance
// of TagServer and starts the server in an auxilliary thread.
// The exchange of data between the main tread and the auxilliary thread is performed via a lockfree queue (boost).

using boost::asio::ip::tcp;
using namespace OpenViBE;
using namespace OpenViBEAcquisitionServer;
using namespace OpenViBEAcquisitionServerPlugins;

// A Tag consists of an identifier to inform about the type of event
// and a timestamp corresponding to the time at which the event occurrs.
struct Tag
{
	uint64 padding, identifier, timestamp;
};

nfoy's avatar
nfoy committed
36
37
class TagSession; // forward declaration of TagSession to define SharedSessionPtr
class TagQueue; // forward declaration of TagQueue to define SharedQueuePtr
nfoy's avatar
nfoy committed
38

nfoy's avatar
nfoy committed
39
typedef boost::shared_ptr<TagQueue> SharedQueuePtr;
nfoy's avatar
nfoy committed
40
typedef boost::shared_ptr<TagSession> SharedSessionPtr;
nfoy's avatar
nfoy committed
41

nfoy's avatar
nfoy committed
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// A trivial implementation of a queue to store Tags with exclusive locking
class TagQueue
{
public:
	TagQueue()
	{
	}

	void push(const Tag& tag)
	{
		boost::lock_guard<boost::mutex> guard(m_mutex);
		m_queue.push(tag);
	}

	bool pop(Tag& tag)
	{
		boost::lock_guard<boost::mutex> guard(m_mutex);
		if (m_queue.size()==0) return false;
		else {
			tag = m_queue.front();
			m_queue.pop();
			return true;
		}
	}
private:
	std::queue<Tag> m_queue;
	boost::mutex m_mutex;
};

nfoy's avatar
nfoy committed
71
72
73
74
75
// An instance of TagSession is associated to every client connecting to the Tagging Server.
// It contains a connection handle and data buffer.
class TagSession : public boost::enable_shared_from_this<TagSession>
{
public:
nfoy's avatar
nfoy committed
76
	TagSession(boost::asio::io_service& io_service, const SharedQueuePtr& queue)
77
		: m_socket(io_service), m_queuePtr(queue)
nfoy's avatar
nfoy committed
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
	{
	}

	tcp::socket& socket()
	{
		return m_socket;
	}

	void start()
	{
		startRead();
	}

	void startRead()
	{
		// Caveat: a shared pointer is used (instead of simply using this) to ensure that this instance of TagSession is still alive when the call-back is called.
		boost::asio::async_read(m_socket, boost::asio::buffer((void *) &m_tag, sizeof(Tag)), boost::bind(&TagSession::handleRead, shared_from_this(), _1));
	}

	void handleRead(const boost::system::error_code& error)
	{
		if (!error) {
100
101
102
103
104
105
106
107
108
109
110
			// If the timestamp is 0, set timestamp to current posix time.
			if (m_tag.timestamp==0) {
        			// Get POSIX time (number of milliseconds since epoch)
        			timeb time_buffer;
        			ftime(&time_buffer);
        			uint64 posixTime = time_buffer.time*1000ULL + time_buffer.millitm;

				// edit timestamp
				m_tag.timestamp=posixTime;
			} 

111
			// Push tag to the queue.
nfoy's avatar
nfoy committed
112
			m_queuePtr->push(m_tag);
113

nfoy's avatar
nfoy committed
114
			// Continue reading.
nfoy's avatar
nfoy committed
115
			startRead();
nfoy's avatar
nfoy committed
116
117
118
119
120
121
		}
	}

private:
	Tag m_tag;
	tcp::socket m_socket;
nfoy's avatar
nfoy committed
122
	SharedQueuePtr m_queuePtr;
nfoy's avatar
nfoy committed
123
124
125
};

// TagServer implements a server that binds to a port and accepts new connections.
nfoy's avatar
nfoy committed
126
// It also has a field sessionSet that holds shared pointers to all exisiting sessions
nfoy's avatar
nfoy committed
127
128
129
130
131
132
class TagServer
{
public:
	// Server port.
	enum {PORT = 15361};

nfoy's avatar
nfoy committed
133
	TagServer(const SharedQueuePtr& queue)
nfoy's avatar
nfoy committed
134
135
136
137
138
139
140
141
142
143
144
		: m_ioService(), m_acceptor(m_ioService, tcp::endpoint(tcp::v4(), PORT)), m_queuePtr(queue)
	{
	}

	void run()
	{
		try {
			startAccept();
			m_ioService.run();
		}
		catch(std::exception& e) {
nfoy's avatar
nfoy committed
145
146
			// TODO: log error message
		}
nfoy's avatar
nfoy committed
147
148
149
150
151
	}

private:
	void startAccept()
	{
nfoy's avatar
nfoy committed
152
		SharedSessionPtr newSession (new TagSession(m_ioService, m_queuePtr));
nfoy's avatar
nfoy committed
153
		// Note: if this instance of TaggingSever is destroyed then the associated io_service is destroyed as well.
nfoy's avatar
nfoy committed
154
		// Therefore the call-back will never be called if this instance is destroyed and it is safe to use this instead of shared pointer.
nfoy's avatar
nfoy committed
155
156
157
		m_acceptor.async_accept(newSession->socket(), boost::bind(&TagServer::handleAccept, this, newSession, _1));
	}

nfoy's avatar
nfoy committed
158
	void handleAccept(SharedSessionPtr session, const boost::system::error_code& error)
nfoy's avatar
nfoy committed
159
160
161
162
163
164
165
166
167
168
169
	{
		if (!error) {
			session->start();
		}

		startAccept();
	}

private:
	boost::asio::io_service m_ioService;
	tcp::acceptor m_acceptor;
nfoy's avatar
nfoy committed
170
	const SharedQueuePtr& m_queuePtr;
nfoy's avatar
nfoy committed
171
172
173
174
175
176
177
178
};

// TagStream allows to collect tags received via TCP.
class TagStream
{
	// Initial memory allocation of lockfree queue.
	enum {ALLOCATE = 128};

nfoy's avatar
nfoy committed
179
public:	   
nfoy's avatar
nfoy committed
180
	TagStream()
nfoy's avatar
nfoy committed
181
		: m_queuePtr(new TagQueue)
nfoy's avatar
nfoy committed
182
183
	{
		boost::thread thread (&TagStream::startServer, this);
nfoy's avatar
nfoy committed
184
	}	 
nfoy's avatar
nfoy committed
185
186
187
188
189
190
191
192
193
194
195

	bool pop(Tag& tag)
	{
		return m_queuePtr->pop(tag);
	}

private:
	void startServer()
	{
		TagServer server(m_queuePtr);
		server.run();
nfoy's avatar
nfoy committed
196
197
	}			 
				
nfoy's avatar
nfoy committed
198
private:
nfoy's avatar
nfoy committed
199
	SharedQueuePtr m_queuePtr;
nfoy's avatar
nfoy committed
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
};

static TagStream tagStream;

// CPluginTCPTagging implementation

CPluginTCPTagging::CPluginTCPTagging(const OpenViBE::Kernel::IKernelContext& rKernelContext)
	: IAcquisitionServerPlugin(rKernelContext, CString("AcquisitionServer_Plugin_TCPTagging"))
{
	m_rKernelContext.getLogManager() << Kernel::LogLevel_Info << "Loading plugin: TCP Tagging\n";
}

CPluginTCPTagging::~CPluginTCPTagging()
{
}

nfoy's avatar
nfoy committed
216
void CPluginTCPTagging::startHook(const std::vector<OpenViBE::CString>& vSelectedChannelNames,
nfoy's avatar
nfoy committed
217
218
219
	OpenViBE::uint32 ui32SamplingFrequency, OpenViBE::uint32 ui32ChannelCount, OpenViBE::uint32 ui32SampleCountPerSentBlock)
{
	// Get POSIX time (number of milliseconds since epoch)
nfoy's avatar
nfoy committed
220
221
222
	timeb time_buffer;
	ftime(&time_buffer);
	uint64 posixTime = time_buffer.time*1000ULL + time_buffer.millitm;
nfoy's avatar
nfoy committed
223
224
225
226
227
228
229
230
231
232

	// Initialize time counters.
	m_previousPosixTime = posixTime;
	m_previousSampleTime = 0;

	// Clear Tag stream
	Tag tag;
	while(tagStream.pop(tag));
}

nfoy's avatar
nfoy committed
233
void CPluginTCPTagging::loopHook(std::vector < std::vector < OpenViBE::float32 > >& /*vPendingBuffer*/,
nfoy's avatar
nfoy committed
234
235
236
	OpenViBE::CStimulationSet& stimulationSet, uint64 start, uint64 end, uint64 sampleTime)
{
	// Get POSIX time (number of milliseconds since epoch)
nfoy's avatar
nfoy committed
237
238
239
	timeb time_buffer;
	ftime(&time_buffer);
	uint64 posixTime = time_buffer.time*1000ULL + time_buffer.millitm;
nfoy's avatar
nfoy committed
240

nfoy's avatar
nfoy committed
241
	Tag tag;
nfoy's avatar
nfoy committed
242
243
244

	// Collect tags from the stream until exhaustion.
	while(tagStream.pop(tag)) {
245
		m_rKernelContext.getLogManager() << Kernel::LogLevel_Trace << "New Tag received (" << tag.padding << ", " << tag.identifier << ", " << tag.timestamp << ") at " << posixTime << " (posix time in ms)\n";
246

247
		// Check that the timestamp fits the current chunk.
248
		if (tag.timestamp < m_previousPosixTime) {
249
			m_rKernelContext.getLogManager() << Kernel::LogLevel_Trace << "The Tag has arrived before the beginning of the current chunk and will be inserted at the beginning of this chunk\n";
250
			tag.timestamp = m_previousPosixTime;
251
252
		}

nfoy's avatar
nfoy committed
253
		// Marker time correction (simple local linear interpolation).
254
255
256
		if (m_previousPosixTime != posixTime) {
			tag.timestamp = m_previousSampleTime + (tag.timestamp - m_previousPosixTime)*((sampleTime - m_previousSampleTime) / (posixTime - m_previousPosixTime));
		}
nfoy's avatar
nfoy committed
257
258

		// Insert tag into the stimulation set.
nfoy's avatar
nfoy committed
259
		stimulationSet.appendStimulation(tag.identifier, tag.timestamp, 0 /* duration of tag (ms) */);
nfoy's avatar
nfoy committed
260
261
262
263
264
265
	}

	// Update time counters.
	m_previousPosixTime = posixTime;
	m_previousSampleTime = sampleTime;
}