ovkCSchedulerMulticore.h 4.91 KB
Newer Older
1 2 3 4 5 6 7
#ifndef __OpenViBEKernel_Kernel_Scheduler_CSchedulerMulticore_H__
#define __OpenViBEKernel_Kernel_Scheduler_CSchedulerMulticore_H__

#include "ovkCScheduler.h"

#include <map>
#include <list>
8 9 10 11 12
#include <deque>

#include <boost/thread.hpp>
#include <boost/thread/condition.hpp>
#include <boost/version.hpp>
13 14 15 16 17 18 19 20 21

namespace OpenViBE
{
	namespace Kernel
	{
		class CSimulatedBox;
		class CChunk;
		class CPlayer;

22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
		// typedef std::pair<boolean (*)(void*), void*> jobInfo;
		typedef boost::function<void()> jobCall;

		class JobContext {
		public:
			JobContext() : m_ui32JobsPending(0), m_bQuit(false) { };

			boost::mutex m_oJobMutex;
		//	boost::mutex m_oJobsPendingMutex;

			std::deque<jobCall> m_vJobList;
			// std::deque<jobInfo> m_vJobList;
			OpenViBE::uint32 m_ui32JobsPending;

			boost::condition_variable m_oHaveWork;
			boost::condition_variable m_oJobDone;
	
			OpenViBE::boolean m_bQuit;
		};

		class CWorkerThread {
		public:
			CWorkerThread(JobContext& ctx) : m_ctx(ctx) { };
	
			JobContext& m_ctx;

			// void operator()(void)
			void run(void)
			{

				while(true)
				{
					jobCall job;
				//	jobInfo job((boolean (*)(void*))NULL,(void*)NULL);

					// Wait until we get a job or are told to quit
					{ // scope for lock
						boost::unique_lock<boost::mutex> lock(m_ctx.m_oJobMutex);

						while(!m_ctx.m_bQuit && m_ctx.m_vJobList.size()==0)
						{
							m_ctx.m_oHaveWork.wait(lock);
						}

						if(m_ctx.m_bQuit) {
							return;
						}

						// Ok, we have a job
						job = m_ctx.m_vJobList.front();
						m_ctx.m_vJobList.pop_front();
					}

					// do job
					// std::cout << "Hello " << job << " ( " << boost::this_thread::get_id() << ")\n";
			
//					std::cout << "Thread " << boost::this_thread::get_id() << " enter job\n";

					// @todo pass job failed to caller
					job();
					// (*job.first)(job.second);

//					std::cout << "Thread " << boost::this_thread::get_id() << " exit job\n";

					// job done, mark as done
					{ // scope for lock
						boost::unique_lock<boost::mutex> lock(m_ctx.m_oJobMutex);	
						m_ctx.m_ui32JobsPending--;
					}

					m_ctx.m_oJobDone.notify_one();
				}
			}

			static void startWorkerThread(CWorkerThread* pThread)
			{
				pThread->run();
				// (*pThread)();
			}

		};


		class parallelExecutor
		{
		public:
			parallelExecutor() { };	
			boolean initialize(const uint32 nThreads);
			boolean push(const jobCall& someJob);							// add job, pass to threads
			boolean pushList(const std::deque<jobCall>& vJobList);			// add jobs
			boolean waitForAll(void);						// wait until all pushed jobs are done
			boolean uninitialize();

		private:

			JobContext m_oJobContext;

			std::vector<CWorkerThread*> m_vWorkerThread;
			std::vector<boost::thread*> m_vThread;

		};



126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
		class CSchedulerMulticore : public OpenViBE::Kernel::CScheduler
		{
		public:

			CSchedulerMulticore(const OpenViBE::Kernel::IKernelContext& rKernelContext, OpenViBE::Kernel::CPlayer& rPlayer);
			virtual ~CSchedulerMulticore(void);

			virtual OpenViBE::boolean setScenario(
				const OpenViBE::CIdentifier& rScenarioIdentifier);
			virtual OpenViBE::boolean setFrequency(
				const OpenViBE::uint64 ui64Frequency);

			virtual SchedulerInitializationCode initialize(void);
			virtual OpenViBE::boolean uninitialize(void);

			virtual OpenViBE::boolean loop(void);

			virtual OpenViBE::boolean sendInput(const OpenViBE::Kernel::CChunk& rChunk, const OpenViBE::CIdentifier& rBoxIdentifier, const OpenViBE::uint32 ui32InputIndex);
			virtual OpenViBE::uint64 getCurrentTime(void) const;
			virtual OpenViBE::uint64 getFrequency(void) const;
			virtual OpenViBE::float64 getCPUUsage(void) const;

148 149 150 151
			virtual OpenViBE::boolean sendMessage(const IMessageWithData &msg, CIdentifier targetBox, uint32 inputIndex);

			static OpenViBE::boolean job(CSchedulerMulticore *context, CIdentifier id, CSimulatedBox* box);
			OpenViBE::boolean runBox(CIdentifier id, CSimulatedBox* box);
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168

			_IsDerivedFromClass_Final_(OpenViBE::Kernel::TKernelObject < OpenViBE::Kernel::IKernelObject >, OVK_ClassId_Kernel_Player_Scheduler);

			CPlayer& getPlayer(void)
			{
				return m_rPlayer;
			}

		protected:

			OpenViBE::Kernel::CPlayer& m_rPlayer;
			OpenViBE::CIdentifier m_oScenarioIdentifier;
			OpenViBE::Kernel::IScenario* m_pScenario;
			OpenViBE::uint64 m_ui64Steps;
			OpenViBE::uint64 m_ui64Frequency;
			OpenViBE::uint64 m_ui64CurrentTime;
			OpenViBE::boolean m_bIsInitialized;
169
			OpenViBE::Kernel::parallelExecutor m_oExecutor;
170 171 172 173

			std::map < std::pair < OpenViBE::int32, OpenViBE::CIdentifier>, OpenViBE::Kernel::CSimulatedBox* > m_vSimulatedBox;
			std::map < OpenViBE::CIdentifier, System::CChrono > m_vSimulatedBoxChrono;
			std::map < OpenViBE::CIdentifier, std::map < OpenViBE::uint32, std::list < OpenViBE::Kernel::CChunk > > > m_vSimulatedBoxInput;
174
			std::map < OpenViBE::CIdentifier, boost::mutex* > m_vSimulatedBoxInputMutex;
175 176 177 178 179 180 181 182 183

		private:

			System::CChrono m_oBenchmarkChrono;
		};
	};
};

#endif // __OpenViBEKernel_Kernel_Scheduler_CSchedulerMulticore_H__