Commit 1e2ec80b authored by NICLAUSSE Nicolas's avatar NICLAUSSE Nicolas

use new ProxyVariant in Remote nodes

parent a97b1724
......@@ -37,13 +37,25 @@ int main(int argc, char **argv)
QCommandLineParser *parser = application->parser();
parser->setApplicationDescription("DTK composer evaluator. Run the given compostion (XML file).");
// /////////////////////////////////////////////////////////////////
// Arguments & Options
// /////////////////////////////////////////////////////////////////
QCommandLineOption pgOption("pg", "enable profiling");
parser->addOption(pgOption);
QCommandLineOption slaveOption("slave", "run as a dtk distributed slave instance");
parser->addOption(slaveOption);
QCommandLineOption serverOption("server", "DTK distributed server URL", "URL");
parser->addOption(serverOption);
application->initialize();
QStringList args = parser->positionalArguments();
if (args.isEmpty()) {
if (args.isEmpty() && !parser->isSet(slaveOption)) {
qDebug() << "Usage: " << argv[0] << "--spawn | [-pg] <composition> ";
return 1;
}
......@@ -52,19 +64,19 @@ int main(int argc, char **argv)
// Old Plugin manager initialization
// /////////////////////////////////////////////////////////////////
QSettings *main_settings = application->settings();
main_settings->beginGroup("plugins");
// QSettings *main_settings = application->settings();
// main_settings->beginGroup("plugins");
if (main_settings->contains("path")) {
dtkPluginManager::instance()->setPath(main_settings->value("path").toString());
}
// if (main_settings->contains("path")) {
// dtkPluginManager::instance()->setPath(main_settings->value("path").toString());
// }
if (parser->isSet("verbose")) {
dtkPluginManager::instance()->setVerboseLoading(true);
}
// if (parser->isSet("verbose")) {
// dtkPluginManager::instance()->setVerboseLoading(true);
// }
main_settings->endGroup();
dtkPluginManager::instance()->initialize();
// main_settings->endGroup();
// dtkPluginManager::instance()->initialize();
dtkComposerFactory *factory = new dtkComposerFactory;
......@@ -82,36 +94,52 @@ int main(int argc, char **argv)
// return value;
// }
dtkComposerScene *scene = new dtkComposerScene;
dtkComposerStack *stack = new dtkComposerStack;
dtkComposerGraph *graph = new dtkComposerGraph;
dtkComposerEvaluator *evaluator = new dtkComposerEvaluator;;
scene->setFactory(factory);
scene->setStack(stack);
scene->setGraph(graph);
evaluator->setGraph(graph);
if (parser->isSet(slaveOption)) {
dtkComposerEvaluatorSlave *slave = new dtkComposerEvaluatorSlave;
slave->setInternalCommunicator(dtkDistributed::communicator::instance());
if (!parser->isSet(serverOption)) {
qCritical() << "Error: no server set when running as slave! Use --server <url> " ;
return 1;
}
slave->setServer(parser->value(serverOption));
slave->setFactory(factory);
dtkComposerReader *reader;
reader = new dtkComposerReader;
reader->setFactory(factory);
reader->setScene(scene);
reader->setGraph(graph);
application->spawn();
do { application->exec(slave); } while (slave->status() == 0);
application->unspawn();
if (parser->isSet(pgOption)) {
evaluator->setProfiling(true);
}
if (!reader->read(args.first())) {
dtkError() << "read failure for " << args.first();
return 1;
}
if (no_gui) {
evaluator->run_static();
} else {
QObject::connect(evaluator,SIGNAL(evaluationStopped()),qApp, SLOT(quit()));
QtConcurrent::run(evaluator, &dtkComposerEvaluator::run_static, false);
qApp->exec();
dtkComposerScene *scene = new dtkComposerScene;
dtkComposerStack *stack = new dtkComposerStack;
dtkComposerGraph *graph = new dtkComposerGraph;
dtkComposerEvaluator *evaluator = new dtkComposerEvaluator;;
scene->setFactory(factory);
scene->setStack(stack);
scene->setGraph(graph);
evaluator->setGraph(graph);
dtkComposerReader *reader;
reader = new dtkComposerReader;
reader->setFactory(factory);
reader->setScene(scene);
reader->setGraph(graph);
if (parser->isSet(pgOption)) {
evaluator->setProfiling(true);
}
if (!reader->read(args.first())) {
dtkError() << "read failure for " << args.first();
return 1;
}
if (no_gui) {
evaluator->run_static();
} else {
QObject::connect(evaluator,SIGNAL(evaluationStopped()),qApp, SLOT(quit()));
QtConcurrent::run(evaluator, &dtkComposerEvaluator::run_static, false);
qApp->exec();
}
}
dtkPluginManager::instance()->uninitialize();
}
......
......@@ -4,6 +4,7 @@
#include "dtkComposerControlsDelegate.h"
#include "dtkComposerEvaluator.h"
#include "dtkComposerEvaluatorProcess.h"
#include "dtkComposerEvaluatorSlave.h"
#include "dtkComposerFactory.h"
#include "dtkComposerGraph.h"
#include "dtkComposerGraphEdge.h"
......
......@@ -52,6 +52,7 @@ public:
QUrl server;
int count;
int last_controller_rank;
int status;
public:
QMap<int, QString> composition_cache;
......@@ -76,7 +77,8 @@ dtkComposerEvaluatorSlave::dtkComposerEvaluatorSlave(void) : dtkDistributedSlave
d->reader->setScene(d->scene);
d->reader->setGraph(d->graph);
d->count = 0;
d->count = 0;
d->status = 0;
}
dtkComposerEvaluatorSlave::~dtkComposerEvaluatorSlave(void)
......@@ -119,12 +121,19 @@ void dtkComposerEvaluatorSlave::setInternalCommunicator(dtkDistributedCommunicat
}
}
int dtkComposerEvaluatorSlave::exec(void)
int dtkComposerEvaluatorSlave::status(void)
{
return d->status;
}
void dtkComposerEvaluatorSlave::run(void)
{
d->status = 0;
if (!d->factory) {
dtkFatal() << "No factory set ! abort slave execution";
return 1;
d->status = 1;
return;
}
int rank = d->communicator_i->rank();
......@@ -150,7 +159,8 @@ int dtkComposerEvaluatorSlave::exec(void)
msg->send(d->composition_socket);
} else {
dtkError() << "Can't connect to server";
return 1;
d->status = 1;
return;
}
}
......@@ -161,7 +171,8 @@ int dtkComposerEvaluatorSlave::exec(void)
this->socket()->setParent(0);
} else {
dtkFatal() << "Can't connect to server" << d->server;
return 1;
d->status = 1;
return;
}
}
......@@ -173,7 +184,8 @@ int dtkComposerEvaluatorSlave::exec(void)
dtkInfo() << "data already available, try to parse composition " << d->composition_socket->bytesAvailable();
} else if (!d->composition_socket->waitForReadyRead(600000)) {
dtkFatal() << "No data received from server after 10mn, abort " ;
return 1;
d->status = 1;
return;
} else
dtkDebug() << "Ok, data received, parse" ;
......@@ -195,12 +207,14 @@ int dtkComposerEvaluatorSlave::exec(void)
}
} else {
dtkFatal() << "Bad composition type, abort" << msg->type() << msg->content();
return 1;
d->status = 1;
return;
}
if (new_composition && composition.isEmpty()) {
dtkFatal() << "Empty composition, abort" ;
return 1;
d->status = 1;
return;
}
dtkDebug() << "got composition from controller:" << composition;
......@@ -228,7 +242,8 @@ int dtkComposerEvaluatorSlave::exec(void)
remote->setCommunicator(d->communicator_i);
} else {
dtkFatal() << "Can't find remote node in composition, abort";
return 1;
d->status = 1;
return;
}
}
dtkDebug() << "run composition" ;
......@@ -248,6 +263,7 @@ int dtkComposerEvaluatorSlave::exec(void)
workerThread->wait();
workerThread->deleteLater();
// d->evaluator->run_static();
dtkDebug() << "finished" ;
} else {
......@@ -278,12 +294,12 @@ int dtkComposerEvaluatorSlave::exec(void)
workerThread->wait();
workerThread->deleteLater();
// d->evaluator->run_static();
dtkDebug() << "finished" ;
} else {
dtkFatal() << "Can't find remote node in composition, abort";
return 1;
d->status = 1;
return;
}
}
return 0;
}
......@@ -23,7 +23,7 @@ class dtkComposerNodeFactory;
class dtkDistributedCommunicator;
class dtkComposerEvaluatorSlavePrivate;
class DTKCOMPOSER_EXPORT dtkComposerEvaluatorSlave : public dtkDistributedSlave
class DTKCOMPOSER_EXPORT dtkComposerEvaluatorSlave : public dtkDistributedSlave, public QRunnable
{
public:
......@@ -37,7 +37,8 @@ public:
void setInternalCommunicator(dtkDistributedCommunicator *communicator);
public:
int exec(void);
void run(void);
int status(void);
private:
dtkComposerEvaluatorSlavePrivate *d;
......
......@@ -17,6 +17,7 @@
#include "dtkComposerTransmitterEmitter.h"
#include "dtkComposerTransmitterReceiver.h"
#include "dtkComposerTransmitterProxyVariant.h"
#include <dtkDistributed/dtkDistributedController.h>
#include <dtkDistributed/dtkDistributedCommunicator.h>
......@@ -164,9 +165,9 @@ void dtkComposerNodeRemote::begin(void)
QList<dtkComposerTransmitter*> receivers = this->dtkComposerNodeComposite::receivers();
int max = receivers.count();
dtkComposerTransmitterReceiverVariant *t = NULL;
dtkComposerTransmitterProxyVariant *t = NULL;
for (int i = 2; i < max; ++i) {
t = reinterpret_cast<dtkComposerTransmitterReceiverVariant*>(receivers.at(i));
t = reinterpret_cast<dtkComposerTransmitterProxyVariant*>(receivers.at(i));
dtkDebug() << "sending transmitter" << i;
msg.reset(new dtkDistributedMessage(dtkDistributedMessage::DATA, d->jobid, 0, t->variant()));
// FIXME: is it the good socket ?
......@@ -178,9 +179,9 @@ void dtkComposerNodeRemote::begin(void)
QList<dtkComposerTransmitter*> receivers = this->dtkComposerNodeComposite::receivers();
int max = receivers.count();
int size = d->communicator->size();
dtkComposerTransmitterReceiverVariant *t = NULL;
dtkComposerTransmitterProxyVariant *t = NULL;
for (int i = 2; i < max; ++i) {
t = reinterpret_cast<dtkComposerTransmitterReceiverVariant*>(receivers.at(i));
t = reinterpret_cast<dtkComposerTransmitterProxyVariant*>(receivers.at(i));
if (d->communicator->rank() == 0) {
d->socket_emitter.setData(d->slave->socket());
......@@ -197,30 +198,18 @@ void dtkComposerNodeRemote::begin(void)
}
msg.reset();
msg->parse(d->slave->socket());
// t->setTwinned(false);
t->clearData();
// t->setVariant(msg->variant());
// t->setTwinned(true);
t->setVariant(msg->variant());
dtkDebug() << "send data to slaves";
//FIXME
for (int j=1; j< size; j++)
d->communicator->send(msg->content(),j,0);
d->communicator->broadcast(msg->variant(), 0);
} else {
QByteArray array;
QVariant variant;
dtkDebug() << "receive data from rank 0";
//FIXME
d->communicator->receive(array, 0, 0);
d->communicator->broadcast(variant, 0);
dtkDebug() << "data received, set";
// t->setTwinned(false);
t->clearData();
QDataStream stream(&array, QIODevice::ReadOnly);
QVariant variant;
stream >> variant;
// FIXME : how can we set data ?
// t->setVariant(variant);
// t->setTwinned(true);
t->setVariant(variant);
}
}
} else {
......@@ -241,9 +230,9 @@ void dtkComposerNodeRemote::end(void)
dtkDebug() << "running node remote end statement on controller";
QList<dtkComposerTransmitter*> emitters = this->dtkComposerNodeComposite::emitters();
int max = this->emitters().count();
dtkComposerTransmitterEmitterVariant *t = NULL;
dtkComposerTransmitterProxyVariant *t = NULL;
for (int i = 1; i < max; ++i) {
t = reinterpret_cast<dtkComposerTransmitterEmitterVariant*>(emitters.at(i));
t = reinterpret_cast<dtkComposerTransmitterProxyVariant*>(emitters.at(i));
if (d->socket->bytesAvailable()) {
dtkDebug() << "data already available, parse" ;
......@@ -255,10 +244,8 @@ void dtkComposerNodeRemote::end(void)
dtkDebug() << "Ok, data received, parse" ;
}
msg->parse(d->socket);
// t->setTwinned(false);
t->clearData();
t->setVariant(msg->variant());
// t->setTwinned(true);
}
} else if (d->communicator) {
// running on the slave, send data and set transmitters
......@@ -267,9 +254,9 @@ void dtkComposerNodeRemote::end(void)
int max = this->emitters().count();
int size = d->communicator->size();
Q_UNUSED(size);
dtkComposerTransmitterEmitterVariant *t = NULL;
dtkComposerTransmitterProxyVariant *t = NULL;
for (int i = 1; i < max; ++i) {
t = reinterpret_cast<dtkComposerTransmitterEmitterVariant*>(emitters.at(i));
t = reinterpret_cast<dtkComposerTransmitterProxyVariant*>(emitters.at(i));
// FIXME: use our own transmitter variant list (see control nodes)
if (d->communicator->rank() == 0) {
dtkDebug() << "end, send transmitter data (we are rank 0)";
......@@ -326,7 +313,7 @@ dtkComposerNodeRemoteSubmit::dtkComposerNodeRemoteSubmit(void) : dtkComposerNode
this->appendReceiver(&(d->walltime));
this->appendReceiver(&(d->queuename));
d->slaveName = "dtkComposerEvaluatorSlave";
d->slaveName = "dtkComposerEvaluator --slave";
this->appendEmitter(&(d->id));
}
......@@ -384,7 +371,7 @@ void dtkComposerNodeRemoteSubmit::run(void)
job.insert("queue", d->queuename.data());
job.insert("properties", QVariantMap());
job.insert("application", d->slaveName+" "+cluster.toString());
job.insert("application", d->slaveName+" --server "+cluster.toString());
QByteArray job_data = QJsonDocument(QJsonObject::fromVariantMap(job)).toJson();
dtkTrace() << " submit job with parameters: "<< job_data;
......
......@@ -41,6 +41,7 @@
#include "dtkComposerTransmitter.h"
#include "dtkComposerTransmitterProxy.h"
#include "dtkComposerTransmitterProxyLoop.h"
#include "dtkComposerTransmitterProxyVariant.h"
#include <dtkCore/dtkCore.h>
// #include <dtkCore/dtkAbstractDataFactory.h>
......@@ -587,6 +588,11 @@ dtkComposerSceneNode *dtkComposerReader::readNode(QDomNode node, bool paste)
proxyloop->setRequired(false);
composite->wrapee()->appendReceiver(proxyloop);
}
if (ports.at(i).toElement().attribute("kind") == "proxyvariant") {
dtkComposerTransmitter *proxyvariant = new dtkComposerTransmitterProxyVariant(composite->wrapee());
proxyvariant->setRequired(false);
composite->wrapee()->appendReceiver(proxyvariant);
}
if (ports.at(i).toElement().hasAttribute("loop"))
port->setLoop(ports.at(i).toElement().attribute("loop").toInt());
} else {
......@@ -606,6 +612,11 @@ dtkComposerSceneNode *dtkComposerReader::readNode(QDomNode node, bool paste)
proxyloop->setRequired(false);
composite->wrapee()->appendEmitter(proxyloop);
}
if (ports.at(i).toElement().attribute("kind") == "proxyvariant") {
dtkComposerTransmitter *proxyvariant = new dtkComposerTransmitterProxyVariant(composite->wrapee());
proxyvariant->setRequired(false);
composite->wrapee()->appendEmitter(proxyvariant);
}
if (ports.at(i).toElement().hasAttribute("loop"))
port->setLoop(ports.at(i).toElement().attribute("loop").toInt());
......
......@@ -609,7 +609,7 @@ void dtkComposerSceneNodeComposite::paint(QPainter *painter, const QStyleOptionG
void dtkComposerSceneNodeComposite::dragEnterEvent(QGraphicsSceneDragDropEvent *event)
{
#if defined(DTK_BUILD_SUPPORT_DISTRIBUTED)
#if defined(DTK_BUILD_SUPPORT)
dtkComposerNodeRemote *remote = dynamic_cast<dtkComposerNodeRemote *>(this->wrapee());
if(!remote) {
......
......@@ -1065,10 +1065,10 @@ void dtkComposerSceneNodeEditor::addInputPort(void)
command->setNode(dynamic_cast<dtkComposerSceneNodeComposite *>(d->node));
command->setType(dtkComposerScenePort::Input);
// #if defined(DTK_BUILD_DISTRIBUTED)
// if (dynamic_cast<dtkComposerNodeRemote *>(d->node->wrapee()))
// command->setKind(dtkComposerTransmitter::Variant);
// #endif
#if defined(DTK_BUILD_DISTRIBUTED)
if (dynamic_cast<dtkComposerNodeRemote *>(d->node->wrapee()))
command->setKind(dtkComposerTransmitter::ProxyVariant);
#endif
}
d->stack->push(command);
......@@ -1141,10 +1141,10 @@ void dtkComposerSceneNodeEditor::addOutputPort(void)
command->setNode(dynamic_cast<dtkComposerSceneNodeComposite *>(d->node));
command->setType(dtkComposerScenePort::Output);
// #if defined(DTK_BUILD_DISTRIBUTED)
// if (dynamic_cast<dtkComposerNodeRemote *>(d->node->wrapee()))
// command->setKind(dtkComposerTransmitter::Variant);
// #endif
#if defined(DTK_BUILD_DISTRIBUTED)
if (dynamic_cast<dtkComposerNodeRemote *>(d->node->wrapee()))
command->setKind(dtkComposerTransmitter::ProxyVariant);
#endif
}
d->stack->push(command);
......
......@@ -40,6 +40,7 @@
#include "dtkComposerTransmitter.h"
#include "dtkComposerTransmitterProxy.h"
#include "dtkComposerTransmitterProxyLoop.h"
#include "dtkComposerTransmitterProxyVariant.h"
#include "dtkComposerWriter.h"
#include <dtkLog/dtkLogger.h>
......@@ -1567,6 +1568,9 @@ void dtkComposerStackCommandCreatePort::redo(void)
case dtkComposerTransmitter::ProxyLoop:
e->transmitter = new dtkComposerTransmitterProxyLoop(e->node->wrapee());
break;
case dtkComposerTransmitter::ProxyVariant:
e->transmitter = new dtkComposerTransmitterProxyVariant(e->node->wrapee());
break;
default:
e->transmitter = new dtkComposerTransmitterProxy(e->node->wrapee());
break;
......@@ -1585,6 +1589,9 @@ void dtkComposerStackCommandCreatePort::redo(void)
case dtkComposerTransmitter::ProxyLoop:
e->transmitter = new dtkComposerTransmitterProxyLoop(e->node->wrapee());
break;
case dtkComposerTransmitter::ProxyVariant:
e->transmitter = new dtkComposerTransmitterProxyVariant(e->node->wrapee());
break;
default:
e->transmitter = new dtkComposerTransmitterProxy(e->node->wrapee());
break;
......@@ -2428,14 +2435,14 @@ void dtkComposerStackCommandReparentNode::redo(void)
if(e->direction == dtkComposerStackCommandReparentNodePrivate::Down) {
#if defined(DTK_BUILD_DISTRIBUTED)
if (dynamic_cast<dtkComposerNodeRemote *>(target->wrapee()))
command->setKind(dtkComposerTransmitter::ProxyLoop);
command->setKind(dtkComposerTransmitter::ProxyVariant);
#endif
command->setNode(target);
}
if(e->direction == dtkComposerStackCommandReparentNodePrivate::Up) {
#if defined(DTK_BUILD_DISTRIBUTED)
if (dynamic_cast<dtkComposerNodeRemote *>(source->wrapee()))
command->setKind(dtkComposerTransmitter::ProxyLoop);
command->setKind(dtkComposerTransmitter::ProxyVariant);
#endif
command->setNode(source);
}
......@@ -2534,14 +2541,14 @@ void dtkComposerStackCommandReparentNode::redo(void)
if(e->direction == dtkComposerStackCommandReparentNodePrivate::Down) {
#if defined(DTK_BUILD_DISTRIBUTED)
if (dynamic_cast<dtkComposerNodeRemote *>(target->wrapee()))
command->setKind(dtkComposerTransmitter::ProxyLoop);
command->setKind(dtkComposerTransmitter::ProxyVariant);
#endif
command->setNode(target);
}
if(e->direction == dtkComposerStackCommandReparentNodePrivate::Up) {
#if defined(DTK_BUILD_DISTRIBUTED)
if (dynamic_cast<dtkComposerNodeRemote *>(source->wrapee()))
command->setKind(dtkComposerTransmitter::ProxyLoop);
command->setKind(dtkComposerTransmitter::ProxyVariant);
#endif
command->setNode(source);
}
......
......@@ -223,7 +223,7 @@ bool dtkComposerWidget::insert(QString file)
void dtkComposerWidget::updateRemotes(dtkComposerSceneNodeComposite *composite)
{
#if defined(DTK_BUILD_SUPPORT_DISTRIBUTED)
#if defined(DTK_BUILD_DISTRIBUTED)
foreach(dtkComposerSceneNode *node, composite->nodes()) {
if (dtkComposerNodeRemote *remote = dynamic_cast<dtkComposerNodeRemote *>(node->wrapee()))
remote->setComposition(d->writer->toXML(dynamic_cast<dtkComposerSceneNodeComposite *>(node)));
......
......@@ -221,6 +221,8 @@ QDomElement dtkComposerWriter::writeNode(dtkComposerSceneNode *node, QDomElement
property.setAttribute("kind", "proxy");
if (port->node()->wrapee()->receivers().at(port->node()->inputPorts().indexOf(port))->kind() == dtkComposerTransmitter::ProxyLoop)
property.setAttribute("kind", "proxyloop");
if (port->node()->wrapee()->receivers().at(port->node()->inputPorts().indexOf(port))->kind() == dtkComposerTransmitter::ProxyVariant)
property.setAttribute("kind", "proxyvariant");
tag.appendChild(property);
}
......@@ -242,6 +244,8 @@ QDomElement dtkComposerWriter::writeNode(dtkComposerSceneNode *node, QDomElement
property.setAttribute("kind", "proxy");
if (port->node()->wrapee()->emitters().at(port->node()->outputPorts().indexOf(port))->kind() == dtkComposerTransmitter::ProxyLoop)
property.setAttribute("kind", "proxyloop");
if (port->node()->wrapee()->emitters().at(port->node()->outputPorts().indexOf(port))->kind() == dtkComposerTransmitter::ProxyVariant)
property.setAttribute("kind", "proxyvariant");
// --- twin ports
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment