Commit 33a854c5 authored by Millian Poquet's avatar Millian Poquet

Warning fix: the same msg_task_t was used to receive tasks which caused SG to...

Warning fix: the same msg_task_t was used to receive tasks which caused SG to display a warning at each MSG_task_receive call. Bug fix: bad initialization of many classes. Bug fix: bad deletion of the buffer in request_reply_scheduler_process. The MSG hello world workload now works! Bigger workloads do not work yet. SMPI does not seem to work either.
parent e56ca13c
......@@ -128,6 +128,7 @@ int main(int argc, char * argv[])
context.jobs.setProfiles(&context.profiles);
context.tracer.setFilename(mainArgs.exportPrefix + "_schedule.trace");
// TODO: check jobs & profile validity
//context.jobs.displayDebug();
bool smpi_used = context.jobs.containsSMPIJob();
if (!smpi_used)
......@@ -139,6 +140,8 @@ int main(int argc, char * argv[])
context.machines.createMachines(hosts, mainArgs.masterHostName);
xbt_dynar_free(&hosts);
const Machine * masterMachine = context.machines.masterMachine();
context.machines.setTracer(&context.tracer);
context.tracer.initialize(&context, MSG_get_clock());
// Socket
context.socket.create_socket(mainArgs.socketFilename);
......@@ -156,6 +159,7 @@ int main(int argc, char * argv[])
msg_error_t res = MSG_main();
// Finalization
context.tracer.finalize(&context, MSG_get_clock());
exportScheduleToCSV(mainArgs.exportPrefix + "_schedule.csv", MSG_get_clock(), &context);
if (res == MSG_OK)
......
......@@ -94,7 +94,7 @@ PajeTracer::~PajeTracer()
}
}
void PajeTracer::initialize(const vector<Machine> & machines, double time)
void PajeTracer::initialize(BatsimContext *context, double time)
{
xbt_assert(state == UNINITIALIZED, "Bad PajeTracer::initialize call: the object is not UNINITIALIZED");
......@@ -216,14 +216,14 @@ void PajeTracer::initialize(const vector<Machine> & machines, double time)
CREATE_CONTAINER, time, rootType, root);
_wbuf->appendText(buf);
for (const Machine & m : machines)
for (const Machine * m : context->machines.machines())
{
// todo : clean machine name
snprintf(buf, bufSize,
"%d %lf %s %s%d \"%s\" %s\n",
CREATE_CONTAINER, time, machineType,
machinePrefix, m.id,
m.name.c_str(), root);
machinePrefix, m->id,
m->name.c_str(), root);
_wbuf->appendText(buf);
}
......@@ -255,18 +255,18 @@ void PajeTracer::initialize(const vector<Machine> & machines, double time)
_wbuf->appendText(buf);
// Let's set all the machines in waiting state
for (const Machine & m : machines)
for (const Machine * m : context->machines.machines())
{
snprintf(buf, bufSize,
"%d %lf %s %s%d %s\n",
SET_STATE, time, machineState, machinePrefix, m.id, mstateWaiting);
SET_STATE, time, machineState, machinePrefix, m->id, mstateWaiting);
_wbuf->appendText(buf);
}
state = INITIALIZED;
}
void PajeTracer::finalize(const vector<Machine> & machines, double time)
void PajeTracer::finalize(BatsimContext * context, double time)
{
xbt_assert(state == INITIALIZED, "Bad PajeTracer::finalize call: the object has not been initialized yet");
......@@ -278,11 +278,11 @@ void PajeTracer::finalize(const vector<Machine> & machines, double time)
"# End of events, containers destruction\n");
_wbuf->appendText(buf);
for (const Machine & m : machines)
for (const Machine * m : context->machines.machines())
{
snprintf(buf, bufSize,
"%d %lf %s%d %s\n",
DESTROY_CONTAINER, time, machinePrefix, m.id, machineType);
DESTROY_CONTAINER, time, machinePrefix, m->id, machineType);
_wbuf->appendText(buf);
}
......
......@@ -84,17 +84,19 @@ public:
/**
* @brief Initializes a PajeTracer.
* @details This function must be called once before adding job launchings, runnings or endings.
* TODO UPDATE
* @param machines The machines
*/
void initialize(const std::vector<Machine> & machines, double time);
void initialize(BatsimContext * context, double time);
/**
* @brief Finalizes a PajeTracer.
* @details This function must be called before the PajeTracer's object destruction.
* @param machines The machines
* TODO UPDATE
* @param time The simulation time at which the finalization is done
*/
void finalize(const std::vector<Machine> & machines, double time);
void finalize(BatsimContext * context, double time);
/**
* @brief Adds a job launch in the file trace.
......
......@@ -36,7 +36,7 @@ struct SchedulingAllocation
{
int job_id;
std::vector<int> machine_ids; //! The IDs of the machines on which the job should be allocated
std::vector<msg_host_t> hosts; //! The corresponding SimGrid hosts
std::vector<msg_host_t> hosts; //! The corresponding SimGrid hosts
};
struct SchedulingAllocationMessage
......
......@@ -9,6 +9,9 @@
#include <streambuf>
#include <algorithm>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/join.hpp>
#include <simgrid/msg.h>
#include <rapidjson/document.h>
......@@ -119,6 +122,7 @@ bool Jobs::exists(int job_id) const
bool Jobs::containsSMPIJob() const
{
xbt_assert(_profiles != nullptr, "Invalid Jobs::containsSMPIJob call: setProfiles had not been called yet");
for (auto & mit : _jobs)
{
Job * job = mit.second;
......@@ -128,6 +132,25 @@ bool Jobs::containsSMPIJob() const
return false;
}
void Jobs::displayDebug() const
{
// Let us traverse jobs to display some information about them
vector<string> jobsVector;
for (auto & mit : _jobs)
{
jobsVector.push_back(std::to_string(mit.second->id));
}
// Let us create the string that will be sent to XBT_INFO
string s = "Jobs debug information:\n";
s += "There are " + to_string(_jobs.size()) + " jobs.\n";
s += "Jobs : [" + boost::algorithm::join(jobsVector, ", ") + "]";
// Let us display the string which has been built
XBT_INFO("%s", s.c_str());
}
const std::map<int, Job* > &Jobs::jobs() const
{
return _jobs;
......
......@@ -49,6 +49,8 @@ public:
bool exists(int job_id) const;
bool containsSMPIJob() const;
void displayDebug() const;
const std::map<int, Job*> & jobs() const;
private:
......
......@@ -92,7 +92,10 @@ void Machines::updateMachinesOnJobRun(int jobID, const std::vector<int> & usedMa
// cout << machine;
if (previous_top_job == -1 || previous_top_job != *machine->jobs_being_computed.begin())
{
xbt_assert(_tracer != nullptr, "Invalid Machines::updateMachinesOnJobRun call: setTracer has not been called");
_tracer->set_machine_as_computing_job(machine->id, *machine->jobs_being_computed.begin(), MSG_get_clock());
}
}
}
......@@ -113,10 +116,12 @@ void Machines::updateMachinesOnJobEnd(int jobID, const std::vector<int> & usedMa
if (machine->jobs_being_computed.empty())
{
machine->state = MachineState::IDLE;
xbt_assert(_tracer != nullptr, "Invalid Machines::updateMachinesOnJobRun call: setTracer has not been called");
_tracer->set_machine_idle(machine->id, MSG_get_clock());
}
else if (*machine->jobs_being_computed.begin() != previous_top_job)
{
xbt_assert(_tracer != nullptr, "Invalid Machines::updateMachinesOnJobRun call: setTracer has not been called");
_tracer->set_machine_as_computing_job(machine->id, *machine->jobs_being_computed.begin(), MSG_get_clock());
}
......
......@@ -14,6 +14,7 @@
#include "context.hpp"
#include "ipp.hpp"
#include "jobs_execution.hpp"
XBT_LOG_NEW_DEFAULT_CATEGORY(network, "network");
......@@ -109,27 +110,6 @@ void UnixDomainSocket::send(const string & message)
write(_client_socket, (void*)message.c_str(), message_size);
}
/*int main()
{
UnixDomainSocket socket("/tmp/bouh");
socket.accept_pending_connection();
bool ok = true;
string s;
for (int i = 0; ok; ++i)
{
socket.send("Hello " + to_string(i));
s = socket.receive();
printf("Received : '%s'\n", s.c_str());
if (s.empty())
ok = false;
}
return 0;
}*/
int request_reply_scheduler_process(int argc, char *argv[])
{
(void) argc;
......@@ -145,7 +125,6 @@ int request_reply_scheduler_process(int argc, char *argv[])
XBT_INFO("Buffer received in REQ-REP: '%s'", sendBuf);
context->socket.send(sendBuf);
free(sendBuf);
auto start = chrono::steady_clock::now();
string message_received = context->socket.receive();
......@@ -288,9 +267,6 @@ int uds_server_process(int argc, char *argv[])
ServerProcessArguments * args = (ServerProcessArguments *) MSG_process_get_data(MSG_process_self());
BatsimContext * context = args->context;
msg_task_t task_received = NULL;
IPMessage * task_data;
int nb_completed_jobs = 0;
int nb_submitted_jobs = 0;
int nb_scheduled_jobs = 0;
......@@ -306,6 +282,8 @@ int uds_server_process(int argc, char *argv[])
(nb_completed_jobs < nb_submitted_jobs) || !sched_ready)
{
// Let's wait a message from a node or the request-reply process...
msg_task_t task_received = NULL;
IPMessage * task_data;
MSG_task_receive(&(task_received), "server");
task_data = (IPMessage *) MSG_task_get_data(task_received);
......@@ -377,8 +355,6 @@ int uds_server_process(int argc, char *argv[])
}
string submittedJobsString = boost::algorithm::join(submittedJobs, ", ");
// todo: transform submittedJobs to a string
XBT_INFO("The available jobs are [%s]", submittedJobsString.c_str());
}
......@@ -403,7 +379,8 @@ int uds_server_process(int argc, char *argv[])
exec_args->context = context;
exec_args->allocation = allocation;
// TODO : launch the job.
//MSG_process_create("job " + to_string(job->id), launch_job, message, context->machines[allocation.machine_ids[0]]->host);
string pname = "job" + to_string(job->id);
MSG_process_create(pname.c_str(), execute_job_process, (void*)exec_args, context->machines[allocation.machine_ids[0]]->host);
}
break;
......@@ -415,8 +392,8 @@ int uds_server_process(int argc, char *argv[])
}
} // end of switch
MSG_task_destroy(task_received);
delete task_data;
MSG_task_destroy(task_received);
if (sched_ready && !send_buffer.empty())
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment