Commit af4ac467 authored by Millian Poquet's avatar Millian Poquet

[code] use json writer

parent 82ebeb2f
......@@ -31,6 +31,7 @@
#include "machines.hpp"
#include "network.hpp"
#include "profiles.hpp"
#include "protocol.hpp"
#include "server.hpp"
#include "workload.hpp"
#include "workflow.hpp"
......@@ -584,6 +585,10 @@ int main(int argc, char * argv[])
context.zmq_socket = new zmq::socket_t(context.zmq_context, ZMQ_REQ);
context.zmq_socket->connect(main_args.socket_endpoint);
// Let's create the protocol reader and writer
context.proto_reader = new JsonProtocolReader(&context);
context.proto_writer = new JsonProtocolWriter;
// Let's store some metadata about the current instance in the data storage
context.storage.set("nb_res", std::to_string(context.machines.nb_machines()));
......@@ -602,6 +607,12 @@ int main(int argc, char * argv[])
delete context.zmq_socket;
context.zmq_socket = nullptr;
delete context.proto_reader;
context.proto_reader = nullptr;
delete context.proto_writer;
context.proto_writer = nullptr;
// If SMPI had been used, it should be finalized
if (context.smpi_used)
SMPI_finalize();
......
......@@ -16,6 +16,7 @@
#include "machines.hpp"
#include "network.hpp"
#include "profiles.hpp"
#include "protocol.hpp"
#include "pstate.hpp"
#include "storage.hpp"
#include "workflow.hpp"
......@@ -33,6 +34,8 @@ struct BatsimContext
{
zmq::context_t zmq_context; //!< The Zero MQ context
zmq::socket_t * zmq_socket = nullptr; //!< The Zero MQ socket (REQ)
AbstractProtocolReader * proto_reader = nullptr;//!< The protocol reader
AbstractProtocolWriter * proto_writer = nullptr;//!< The protocol writer
Machines machines; //!< The machines
Workloads workloads; //!< The workloads
......
......@@ -18,7 +18,10 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(ipp, "ipp"); //!< Logging
* @param data TODO
* @param detached TODO
*/
void generic_send_message(const std::string & destination_mailbox, IPMessageType type, void * data, bool detached)
void generic_send_message(const std::string & destination_mailbox,
IPMessageType type,
void * data,
bool detached)
{
IPMessage * message = new IPMessage;
message->type = type;
......
......@@ -667,7 +667,7 @@ void JsonProtocolReader::handle_set_resource_state(int event_number, double time
void JsonProtocolReader::handle_notify(int event_number, double timestamp, const Value &data_object)
{
xbt_assert(false, "Unimplemented");
xbt_assert(false, "Unimplemented! TODO");
(void) event_number;
(void) timestamp;
(void) data_object;
......@@ -675,7 +675,7 @@ void JsonProtocolReader::handle_notify(int event_number, double timestamp, const
void JsonProtocolReader::handle_submit_job(int event_number, double timestamp, const Value &data_object)
{
xbt_assert(false, "Unimplemented");
xbt_assert(false, "Unimplemented! TODO");
(void) event_number;
(void) timestamp;
(void) data_object;
......@@ -683,7 +683,7 @@ void JsonProtocolReader::handle_submit_job(int event_number, double timestamp, c
void JsonProtocolReader::handle_kill_job(int event_number, double timestamp, const Value &data_object)
{
xbt_assert(false, "Unimplemented");
xbt_assert(false, "Unimplemented! TODO");
(void) event_number;
(void) timestamp;
(void) data_object;
......
......@@ -37,7 +37,6 @@ int server_process(int argc, char *argv[])
int nb_running_jobs = 0;
int nb_switching_machines = 0;
int nb_waiters = 0;
const int protocol_version = 3;
bool sched_ready = true;
bool all_jobs_submitted_and_completed = false;
bool end_of_simulation_sent = false;
......@@ -60,11 +59,12 @@ int server_process(int argc, char *argv[])
string send_buffer;
// Let's tell the Decision process that the simulation is about to begin (and that some data can be read from the data storage)
send_buffer = "|" + std::to_string(MSG_get_clock()) + ":A";
context->proto_writer->append_simulation_begins(MSG_get_clock());
RequestReplyProcessArguments * req_rep_args = new RequestReplyProcessArguments;
req_rep_args->context = context;
req_rep_args->send_buffer = to_string(protocol_version) + ":" + to_string(MSG_get_clock()) + send_buffer;
send_buffer.clear();
req_rep_args->send_buffer = context->proto_writer->generate_current_message(MSG_get_clock());
context->proto_writer->clear();
MSG_process_create("Scheduler REQ-REP", request_reply_scheduler_process, (void*)req_rep_args, MSG_host_self());
sched_ready = false;
......@@ -128,8 +128,7 @@ int server_process(int argc, char *argv[])
all_jobs_submitted_and_completed = true;
XBT_INFO("It seems that all jobs have been submitted and completed!");
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":Z";
XBT_DEBUG("Message to send to scheduler: %s", send_buffer.c_str());
context->proto_writer->append_simulation_ends(MSG_get_clock());
}
} break; // end of case SUBMITTER_BYE
......@@ -159,8 +158,13 @@ int server_process(int argc, char *argv[])
XBT_INFO("Job %s!%d COMPLETED. %d jobs completed so far",
job->workload->name.c_str(), job->number, nb_completed_jobs);
send_buffer += '|' + std::to_string(MSG_get_clock()) + ":C:" + message->job_id.to_string();
XBT_DEBUG( "Message to send to scheduler: %s", send_buffer.c_str());
string status = "UNKNOWN";
if (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY)
status = "SUCCESS";
else if (job->state == JobState::JOB_STATE_COMPLETED_KILLED && job->kill_reason == "Walltime reached")
status = "TIMEOUT";
context->proto_writer->append_job_completed(message->job_id.to_string(), status, MSG_get_clock());
if (!all_jobs_submitted_and_completed &&
nb_completed_jobs == nb_submitted_jobs &&
......@@ -169,8 +173,7 @@ int server_process(int argc, char *argv[])
all_jobs_submitted_and_completed = true;
XBT_INFO("It seems that all jobs have been submitted and completed!");
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":Z";
XBT_DEBUG( "Message to send to scheduler: %s", send_buffer.c_str());
context->proto_writer->append_simulation_ends(MSG_get_clock());
}
} break; // end of case JOB_COMPLETED
......@@ -207,9 +210,7 @@ int server_process(int argc, char *argv[])
XBT_INFO("Job %s SUBMITTED. %d jobs submitted so far",
message->job_id.to_string().c_str(), nb_submitted_jobs);
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":S:" + message->job_id.to_string();
XBT_DEBUG( "Message to send to scheduler: '%s'", send_buffer.c_str());
context->proto_writer->append_job_submitted({job->id}, MSG_get_clock());
} break; // end of case JOB_SUBMITTED
case IPMessageType::JOB_SUBMITTED_BY_DP:
......@@ -317,8 +318,8 @@ int server_process(int argc, char *argv[])
if (context->current_switches.mark_switch_as_done(machine->id, message->new_pstate,
reply_message_content, context))
{
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":p:" + reply_message_content;
XBT_DEBUG("Message to send to scheduler : '%s'", send_buffer.c_str());
xbt_assert(false, "Unimplemented! TODO");
//context->proto_writer->append_resource_state_changed();
}
}
else if (machine->pstates[message->new_pstate] == PStateType::SLEEP_PSTATE)
......@@ -460,8 +461,7 @@ int server_process(int argc, char *argv[])
case IPMessageType::WAITING_DONE:
{
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":N";
XBT_DEBUG("Message to send to scheduler: '%s'", send_buffer.c_str());
context->proto_writer->append_nop(MSG_get_clock());
--nb_waiters;
} break; // end of case WAITING_DONE
......@@ -483,14 +483,10 @@ int server_process(int argc, char *argv[])
case IPMessageType::WAIT_QUERY:
{
WaitQueryMessage * message = (WaitQueryMessage *) task_data->data;
//WaitQueryMessage * message = (WaitQueryMessage *) task_data->data;
// XBT_INFO("received : %s , %s\n", to_string(message->nb_resources).c_str(), to_string(message->processing_time).c_str());
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":Q:"
+ message->submitter_name.c_str() + ","
+ to_string(message->nb_resources).c_str() + ","
+ boost::lexical_cast<string>(message->processing_time).c_str();
// XBT_INFO("INFO!!! Message to send to scheduler : '%s'", send_buffer.c_str());
xbt_assert(false, "Unimplemented! TODO");
//Submitter * submitter = submitters.at(message->submitter_name);
//origin_of_wait_queries[{message->nb_resources,message->processing_time}] = submitter;
......@@ -513,8 +509,8 @@ int server_process(int argc, char *argv[])
if (context->trace_machine_states)
context->machine_state_tracer.write_machine_states(MSG_get_clock());
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":p:" + reply_message_content;
XBT_DEBUG("Message to send to scheduler : '%s'", send_buffer.c_str());
xbt_assert(false, "Unimplemented! TODO");
//context->proto_writer->append_resource_state_changed();
}
--nb_switching_machines;
......@@ -537,8 +533,8 @@ int server_process(int argc, char *argv[])
if (context->trace_machine_states)
context->machine_state_tracer.write_machine_states(MSG_get_clock());
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":p:" + reply_message_content;
XBT_DEBUG("Message to send to scheduler : '%s'", send_buffer.c_str());
xbt_assert(false, "Unimplemented! TODO");
//context->proto_writer->append_resource_state_changed();
}
--nb_switching_machines;
......@@ -547,10 +543,7 @@ int server_process(int argc, char *argv[])
case IPMessageType::SCHED_TELL_ME_ENERGY:
{
long double total_consumed_energy = context->machines.total_consumed_energy(context);
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":e:" +
std::to_string(total_consumed_energy);
XBT_DEBUG("Message to send to scheduler : '%s'", send_buffer.c_str());
context->proto_writer->append_query_reply_energy(total_consumed_energy, MSG_get_clock());
} break; // end of case SCHED_TELL_ME_ENERGY
case IPMessageType::SUBMITTER_CALLBACK:
......@@ -568,12 +561,12 @@ int server_process(int argc, char *argv[])
delete task_data;
MSG_task_destroy(task_received);
if (sched_ready && !send_buffer.empty() && !end_of_simulation_sent)
if (sched_ready && !context->proto_writer->is_empty() && !end_of_simulation_sent)
{
RequestReplyProcessArguments * req_rep_args = new RequestReplyProcessArguments;
req_rep_args->context = context;
req_rep_args->send_buffer = to_string(protocol_version) + ":" + to_string(MSG_get_clock()) + send_buffer;
send_buffer.clear();
req_rep_args->send_buffer = context->proto_writer->generate_current_message(MSG_get_clock());
context->proto_writer->clear();
MSG_process_create("Scheduler REQ-REP", request_reply_scheduler_process, (void*)req_rep_args, MSG_host_self());
sched_ready = false;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment