Commit 49ad168b authored by NICLAUSSE Nicolas's avatar NICLAUSSE Nicolas

thread safe implementation of RemoteSubmit. Change a bit the controller API

parent 15991707
......@@ -29,6 +29,7 @@ add_executable(${PROJECT_NAME}
target_link_libraries(${PROJECT_NAME}
dtkDistributed
dtkLog
dtkWidgets
)
target_link_libraries(${PROJECT_NAME} Qt5::Core)
......
......@@ -21,7 +21,6 @@ Item {
id: servers
ListElement { text: "localhost" }
ListElement { text: "nef-devel.inria.fr" }
ListElement { text: "nef-devel2.inria.fr" }
ListElement { text: "fsophia.sophia.grid5000.fr" }
ListElement { text: "controller.vg" }
}
......@@ -120,7 +119,7 @@ Item {
tooltip: "Deploy resource manager daemon on remote host"
onClicked: {
controller.deploy( Status.url(false),combotypes.currentText, tunnel.checked, serverPath.text, "trace");
controller.connect( Status.url(false), tunnel.checked, true);
controller.connectSrv( Status.url(false), true);
Status.show()
}
}
......
......@@ -20,18 +20,24 @@
#include <dtkDistributed/dtkDistributedPolicy.h>
#include <dtkDistributed/dtkDistributedSettings.h>
#include <dtkWidgets/dtkApplication.h>
#include <QtQml>
#include <QtQml/QQmlApplicationEngine>
#include <QtQuick>
#include <QGuiApplication>
int main(int argc, char *argv[])
{
QGuiApplication app(argc, argv);
app.setOrganizationName("inria");
app.setOrganizationDomain("fr");
app.setApplicationName("dtkDistributedDashboard");
dtkApplication *app = dtkApplication::create(argc, argv);
app->setOrganizationName("inria");
app->setOrganizationDomain("fr");
app->setApplicationName("dtkDistributedDashboard");
QCommandLineParser *parser = app->parser();
parser->setApplicationDescription("DTK Distributed Dashboard. Example application using dtkDistributedController and QML.");
app->initialize();
qmlRegisterType<dtkDistributedController>("dtkDistributed", 1, 0, "DistributedController");
qmlRegisterType<dtkDistributedPolicy>("dtkDistributed", 1, 0, "DistributedPolicy");
......@@ -42,7 +48,7 @@ int main(int argc, char *argv[])
settings.endGroup();
QQmlApplicationEngine engine(QUrl("qrc:/dashmain.qml"));
return app.exec();
return app->exec();
}
......@@ -23,10 +23,8 @@ function policy()
function guess_type(server)
{
if (/nef-devel2/.test(server)) {
if (/nef/.test(server)) {
return "oar"
} else if (/nef/.test(server)) {
return "torque"
} else if (/grid5000/.test(server)) {
return "oar"
} else {
......
......@@ -71,7 +71,7 @@ int main(int argc, char **argv)
parser->addOption(serverOption);
app->initialize();
dtkDistributed::communicator::setInstance(app->communicator());
if (!parser->isSet(serverOption)) {
qCritical() << "Error: no server set ! Use --server <url> " ;
return 1;
......
......@@ -19,6 +19,7 @@
#include "dtkComposerTransmitterReceiver.h"
#include "dtkComposerTransmitterProxyVariant.h"
#include <dtkDistributed/dtkDistributed.h>
#include <dtkDistributed/dtkDistributedController.h>
#include <dtkDistributed/dtkDistributedCommunicator.h>
#include <dtkDistributed/dtkDistributedSettings.h>
......@@ -34,20 +35,20 @@
dtkComposerNodeRemote::dtkComposerNodeRemote(void) : QObject(), dtkComposerNodeComposite(), d(new dtkComposerNodeRemotePrivate)
{
static qlonglong count = 0;
// static QAtomicInt count;
this->appendReceiver(&(d->jobid_receiver));
this->appendReceiver(&(d->socket_emitter));
this->appendEmitter(&(d->socket_emitter));
// int id = count.fetchAndAddRelaxed(1);
d->communicator = NULL;
d->socket = NULL;
d->controller = NULL;
d->slave = NULL;
d->rank = dtkDistributedMessage::CONTROLLER_RUN_RANK - count;
d->rank = dtkDistributedMessage::CONTROLLER_RUN_RANK;
d->title = "Remote";
count ++;
}
dtkComposerNodeRemote::~dtkComposerNodeRemote(void)
......@@ -106,8 +107,9 @@ void dtkComposerNodeRemote::onJobStarted(QString jobid)
{
if (jobid == d->jobid) {
QObject::disconnect( dtkDistributedController::instance(), SIGNAL(jobStarted(QString)), this, SLOT(onJobStarted(QString)));
emit jobStarted(jobid);
} else {
dtkDebug() << "A job has started, but it's not ours, keep waiting " << d->jobid << jobid ;
dtkDebug() << "A job has started: " << jobid << ", but it's not ours, keep waiting " << d->jobid ;
}
}
......@@ -120,13 +122,13 @@ void dtkComposerNodeRemote::begin(void)
// not drag&dropped, get job from transmitter and main
// controller instance
d->jobid = d->jobid_receiver.data();
d->controller = dtkDistributedController::instance();
d->controller = dtkDistributed::controller::instance();
if (!d->controller->is_running(d->jobid)) {
dtkDebug() << " Wait for job to start, jobid is " << d->jobid;
QEventLoop loop;
this->connect(d->controller, SIGNAL(jobStarted(QString)), this, SLOT(onJobStarted(QString)), Qt::DirectConnection);
loop.connect(d->controller, SIGNAL(jobStarted(QString)), &loop, SLOT(quit()));
this->connect(d->controller, SIGNAL(jobStarted(QString)), this, SLOT(onJobStarted(QString)));
loop.connect(this, SIGNAL(jobStarted(QString)), &loop, SLOT(quit()));
loop.connect(qApp, SIGNAL(aboutToQuit()), &loop, SLOT(quit()));
loop.exec();
dtkTrace() << "waiting event loop ended, job has started" << d->jobid;
......@@ -334,10 +336,13 @@ public:
public:
QString slaveName;
QString submit_id;
};
dtkComposerNodeRemoteSubmit::dtkComposerNodeRemoteSubmit(void) : dtkComposerNodeLeaf(), d(new dtkComposerNodeRemoteSubmitPrivate)
{
static QAtomicInt id = 66666;
d->submit_id = QString::number(id.fetchAndAddRelaxed(1));
this->appendReceiver(&(d->cluster));
this->appendReceiver(&(d->nodes));
this->appendReceiver(&(d->cores));
......@@ -425,7 +430,7 @@ void dtkComposerNodeRemoteSubmit::run(void)
dtkTrace() << " submit job with parameters: " << job_data;
dtkDistributedController *controller = dtkDistributedController::instance();
dtkDistributedController *controller = dtkDistributed::controller::instance();
if (!controller->isConnected(cluster)) {
dtkInfo() << "Not yet connected to " << cluster << ",try to connect";
......@@ -433,21 +438,21 @@ void dtkComposerNodeRemoteSubmit::run(void)
bool use_tunnel = settings.use_ssh_tunnel(cluster);
bool register_rank = true;
if (controller->deploy(cluster, settings.server_type(cluster), use_tunnel , settings.path(cluster) )) {
if (dtkDistributed::controller::deploy(cluster, settings.server_type(cluster), use_tunnel , settings.path(cluster) )) {
QThread::sleep(1);
controller->connect(cluster, use_tunnel, register_rank);
dtkDistributed::controller::connectSrv(cluster, register_rank);
} else {
dtkError() << "Unable to deploy server" << cluster;
dtkFatal() << "Unable to deploy server" << cluster;
return;
}
} else {
dtkInfo() << "Controller is already connection to server " << cluster;
}
if (controller->submit(cluster, job_data)) {
if (dtkDistributed::controller::submit(cluster, job_data, d->submit_id)) {
QEventLoop loop;
this->connect(controller, SIGNAL(jobQueued(QString)), this, SLOT(onJobQueued(QString)), Qt::DirectConnection);
loop.connect(controller, SIGNAL(jobQueued(QString)), &loop, SLOT(quit()));
this->connect(controller, SIGNAL(jobQueued(QString, QString)), this, SLOT(onJobQueued(QString, QString)), Qt::QueuedConnection);
loop.connect(this, SIGNAL(jobQueued(QString)), &loop, SLOT(quit()));
loop.connect(qApp, SIGNAL(aboutToQuit()), &loop, SLOT(quit()));
loop.exec();
......@@ -457,8 +462,14 @@ void dtkComposerNodeRemoteSubmit::run(void)
dtkError() << "failed to submit ";
}
void dtkComposerNodeRemoteSubmit::onJobQueued(const QString& job_id)
void dtkComposerNodeRemoteSubmit::onJobQueued(const QString& job_id, const QString& submit_id)
{
d->id.setData(job_id);
QObject::disconnect( dtkDistributedController::instance(), SIGNAL(jobQueued(QString)), this, SLOT(onJobQueued(QString)));
if (submit_id == d->submit_id) {
dtkDebug() << "our job has been queued" << job_id << submit_id;
d->id.setData(job_id);
QObject::disconnect( dtkDistributedController::instance(), SIGNAL(jobQueued(QString, QString)), this, SLOT(onJobQueued(QString, QString)));
emit jobQueued(job_id);
} else {
dtkDebug() << "Another job has been queued" << submit_id << "our id is " << d->submit_id ;
}
}
......@@ -40,6 +40,9 @@ public:
dtkComposerNodeRemote(void);
virtual ~dtkComposerNodeRemote(void);
signals:
void jobStarted(QString jobid);
public slots:
void onJobStarted(QString id);
......@@ -84,8 +87,11 @@ public:
public:
void run(void);
signals:
void jobQueued(QString jobid);
public slots:
void onJobQueued(const QString& job_id);
void onJobQueued(const QString& job_id, const QString& submit_id);
public:
void setSlaveName(QString);
......
......@@ -19,6 +19,7 @@
#include "dtkDistributed.h"
#include "dtkDistributedApplication.h"
#include "dtkDistributedCommunicator.h"
#include "dtkDistributedController.h"
#include "dtkDistributedPolicy.h"
#include "dtkDistributedSettings.h"
......@@ -127,6 +128,11 @@ namespace dtkDistributed {
pluginManager().initialize(path);
}
}
// used to dtkDistributedGuiApplication: we can't link here to dtkWidgets so we need a setter
void setInstance(dtkDistributedCommunicator *comm)
{
_private::communicator = comm;
}
dtkDistributedCommunicator *instance(void)
{
......@@ -146,4 +152,51 @@ namespace dtkDistributed {
return _private::communicator;
}
}
namespace controller {
namespace _private {
dtkDistributedController *controller = NULL;
QThread *controllerThread = NULL;
QMutex mutex;
}
dtkDistributedController *instance(void)
{
QMutexLocker locker(&_private::mutex);
if (_private::controller) {
return _private::controller;
} else {
dtkInfo() << "no controller: create a default controller in its own thread";
_private::controller = dtkDistributedController::instance();
_private::controllerThread = new QThread();
_private::controller->moveToThread(_private::controllerThread);
QObject::connect(_private::controllerThread, SIGNAL(finished()), _private::controllerThread, SLOT(deleteLater()));
_private::controllerThread->start();
return _private::controller;
}
}
bool connectSrv(const QUrl& server, bool ssh_rank, bool emit_connected)
{
bool res;
QMetaObject::invokeMethod(_private::controller, "connectSrv", Qt::BlockingQueuedConnection, Q_RETURN_ARG(bool, res),
Q_ARG(QUrl, server), Q_ARG(bool, ssh_rank), Q_ARG(bool, emit_connected) );
return res;
}
bool deploy(const QUrl& server, QString type, bool ssh_tunnel, QString path, QString loglevel)
{
bool res;
QMetaObject::invokeMethod(_private::controller, "deploy", Qt::BlockingQueuedConnection, Q_RETURN_ARG(bool, res),
Q_ARG(QUrl, server), Q_ARG(QString, type), Q_ARG(bool, ssh_tunnel), Q_ARG(QString, path), Q_ARG(QString, loglevel) );
return res;
}
bool submit(const QUrl& server, const QByteArray& resources, const QString& submit_id)
{
bool res;
QMetaObject::invokeMethod(_private::controller, "submit", Qt::BlockingQueuedConnection, Q_RETURN_ARG(bool, res),
Q_ARG(QUrl, server), Q_ARG(QByteArray, resources), Q_ARG(QString, submit_id) );
return res;
}
}
}
......@@ -16,9 +16,11 @@
#include "dtkDistributedExport.h"
#include <QtCore>
#include <QUrl>
class dtkDistributedApplication;
class dtkDistributedCommunicator;
class dtkDistributedController;
class dtkDistributedCoreApplication;
class dtkDistributedCommunicatorPlugin;
class dtkDistributedCommunicatorPluginFactory;
......@@ -84,8 +86,26 @@ namespace dtkDistributed {
namespace communicator {
DTKDISTRIBUTED_EXPORT dtkDistributedCommunicatorPluginFactory& pluginFactory(void);
DTKDISTRIBUTED_EXPORT dtkDistributedCommunicatorPluginManager& pluginManager(void);
DTKDISTRIBUTED_EXPORT void initialize(const QString& path = QString());
DTKDISTRIBUTED_EXPORT dtkDistributedCommunicator *instance(void);
DTKDISTRIBUTED_EXPORT dtkDistributedCommunicator *instance(void);
DTKDISTRIBUTED_EXPORT void initialize(const QString& path = QString());
DTKDISTRIBUTED_EXPORT void setInstance(dtkDistributedCommunicator *comm);
}
namespace controller {
DTKDISTRIBUTED_EXPORT dtkDistributedController *instance(void);
// wrappers to force the execution of the methods in the controller thread
DTKDISTRIBUTED_EXPORT bool connectSrv(const QUrl& server, bool ssh_rank = false, bool emit_connected = true);
DTKDISTRIBUTED_EXPORT bool deploy(const QUrl& server, QString type = "local", bool ssh_tunnel = false, QString path = "./dtkDistributedServer", QString loglevel = "info");
DTKDISTRIBUTED_EXPORT bool submit(const QUrl& server, const QByteArray& resources, const QString& submit_id = "");
/* DTKDISTRIBUTED_EXPORT void disconnect(const QUrl& server); */
/* DTKDISTRIBUTED_EXPORT void stop(const QUrl& server); */
/* DTKDISTRIBUTED_EXPORT void refresh(const QUrl& server); */
/* DTKDISTRIBUTED_EXPORT void killjob(const QUrl& server, QString jobid); */
/* void send(dtkDistributedMessage *msg); */
/* void send(QVariant v, QString jobid, qint16 destrank); */
}
}
......
......@@ -57,18 +57,18 @@ signals:
void dataPosted(QVariant data);
void jobEnded(QString jobid);
void jobStarted(QString jobid);
void jobQueued(QString jobid);
void jobQueued(QString jobid, QString submit_id);
public slots:
bool connect(const QUrl& server, bool ssh_tunnel = false, bool ssh_rank = false, bool emit_connected = true);
void disconnect(const QUrl& server);
bool connectSrv(const QUrl& server, bool ssh_rank = false, bool emit_connected = true);
void disconnectSrv(const QUrl& server);
void stop(const QUrl& server);
void refresh(const QUrl& server);
void killjob(const QUrl& server, QString jobid);
void send(dtkDistributedMessage *msg);
void send(QVariant v, QString jobid, qint16 destrank);
bool deploy(const QUrl& server, QString type = "local", bool ssh_tunnel = false, QString path = "./dtkDistributedServer", QString loglevel = "info");
bool submit(const QUrl& server, QByteArray& resources);
bool submit(const QUrl& server, const QByteArray& resources, const QString& submit_id = "");
public:
bool is_running(const QString& jobid);
......
......@@ -88,6 +88,9 @@ void dtkDistributedMessage::setMethod(QString method)
d->method = STATUS;
} else if (method.startsWith("PUT /job HTTP")) {
d->method = NEWJOB;
} else if (method.startsWith("PUT /job/")) {
d->method = NEWJOB;
d->rank = tokens[2].remove("HTTP").toInt();
} else if (method.startsWith("DELETE /job")) {
d->jobid = tokens[2].remove("HTTP").trimmed();
......@@ -215,11 +218,12 @@ QString dtkDistributedMessage::req(void)
break;
case NEWJOB:
req = "PUT /job HTTP/1.1";
dtkTrace() << "newjob, rank (submit_id) is " << d->rank;
req = "PUT /job/"+QString::number(d->rank) + " HTTP/1.1";
break;
case OKJOB:
req = "HTTP/1.1 201 OK\nX-DTK-JobId: " + d->jobid;
req = "HTTP/1.1 201 OK\nX-DTK-JobId: " + d->jobid + "\nX-DTK-RankId: " + QString::number(d->rank);
break;
case ERRORJOB:
......@@ -366,7 +370,10 @@ qlonglong dtkDistributedMessage::parse(QTcpSocket *socket)
this->setSize();
if (d->headers.contains("x-dtk-jobid")) {
d->jobid = d->headers["x-dtk-jobid"].trimmed();;
d->jobid = d->headers["x-dtk-jobid"].trimmed();
}
if (d->headers.contains("x-dtk-rankid")) {
d->rank = d->headers["x-dtk-rankid"].trimmed().toInt();
}
dtkTrace() << "parse: jobid is" << d->jobid << "size:" << d->size << "method:" << this->methodString();
......
......@@ -158,6 +158,8 @@ void dtkDistributedServerDaemon::read(void)
dtkDebug() << "****** read message of type" << msg->methodString();
int submit_id;
switch (msg->method()) {
case dtkDistributedMessage::STATUS:
r = d->manager->status();
......@@ -171,12 +173,14 @@ void dtkDistributedServerDaemon::read(void)
break;
case dtkDistributedMessage::NEWJOB:
submit_id = msg->rank();
jobid = d->manager->submit(msg->content());
dtkDebug() << "New job queued for " << submit_id << ", jobid is" << jobid;
if (jobid == "ERROR") {
resp.reset(new dtkDistributedMessage(dtkDistributedMessage::ERRORJOB, jobid));
resp.reset(new dtkDistributedMessage(dtkDistributedMessage::ERRORJOB, jobid, submit_id));
} else {
resp.reset(new dtkDistributedMessage(dtkDistributedMessage::OKJOB, jobid));
resp.reset(new dtkDistributedMessage(dtkDistributedMessage::OKJOB, jobid, submit_id));
}
resp->send(socket);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment