Commit d179fde5 authored by FRIEDEMANN Sebastian's avatar FRIEDEMANN Sebastian
Browse files

Work on mechanic to kill crashed simulations

parent 517a4e38
......@@ -4,6 +4,7 @@
// TODO 2. error prone ness
// TODO: check ret values!
// TODO 3. check with real world sim and DA.
// TODO use model task runner instead of simu every where? Theo!
#include <map>
#include <string>
......@@ -11,6 +12,7 @@
#include <cassert>
#include <vector>
#include <set>
#include <mpi.h>
#include "zmq.h"
......@@ -31,6 +33,7 @@ const int MAX_TIMESTAMP = 5;
using namespace std;
const int TAG_NEW_TASK = 42;
const int TAG_REMOVE_RUNNER = 43;
size_t IDENTITY_SIZE = 0;
......@@ -312,15 +315,34 @@ struct Simulation // Model process runner
}
}
// // return true if it is ready to take work as doing nothing and it has a connection identity...
// bool ready() {
// for (auto cs = connected_simulation_ranks.begin(); cs != connected_simulation_ranks.end(); cs++) {
// if (cs->second.current_task != WANT_WORK || cs->second.connection_identity == NULL) {
// return false;
// }
// }
// return true;
// }
// Returns the highest state of task somebody does within the group of connected simulations. As tasks like no work have negative state id's
// they will only be returned if no connected simulation rank processes a real state atm...
int getCurrentState() {
int result = -3;
for (auto cs = connected_simulation_ranks.begin(); cs != connected_simulation_ranks.end(); cs++) {
if (cs->second.current_task.state_id > result) {
result = cs->second.current_task.state_id;
}
}
return result;
}
set<int> getScheduledStates() {
set<int> result;
for (auto cs = connected_simulation_ranks.begin(); cs != connected_simulation_ranks.end(); cs++) {
for (auto task_it = cs->second.waiting_tasks.begin(); task_it != cs->second.waiting_tasks.end(); task_it++)
{
result.emplace(task_it->second.state_id);
}
}
return result;
}
~Simulation()
{
D("Deleting Model Task Runner!");
}
};
// simu_id, Simulation:
......@@ -335,6 +357,8 @@ void answer_configuration_message(void * configuration_socket, char* data_respon
zmq_msg_recv(&msg, configuration_socket, 0);
int * buf = reinterpret_cast<int*>(zmq_msg_data(&msg));
if (buf[0] == REGISTER_SIMU_ID) {
// As we do not check the simulation ID's here one could have two simulations with the same id. that would lead to strange bugs.
// low: so Maybe we should store all registered simulation ids and check if no one tries to register twice.
// Register Simu ID
assert(zmq_msg_size(&msg) == 2 * sizeof(int));
assert(buf[1] >= 0); // don't allow negative simu_ids
......@@ -429,7 +453,7 @@ void broadcast_field_information_and_calculate_parts() {
}
}
vector<int> unscheduled_tasks;
set<int> unscheduled_tasks;
/// returns true if it scheduled a new task for the given simuid
/// schedules a new task on a model task runner
bool schedule_new_task(int simuid)
......@@ -524,6 +548,107 @@ void do_update_step()
}
}
void check_kill_simu_message();
// returns true if simu was there, so not already killed.
bool kill_simu(int simu_id) {
auto found = simulations.find(simu_id);
if (found != simulations.end()) {
if (comm_rank == 0)
{
// Save scheduled tasks. (only necessary on rank 0 as he does the rescheduling...)
int state_id = found->second.getCurrentState();
if (state_id >= 0)
{
unscheduled_tasks.emplace(state_id);
}
set<int> scheduled_states = found->second.getScheduledStates();
// TODO: there must be a better way to emplace a vector in a set...
for (auto it = scheduled_states.begin(); it != scheduled_states.end(); it++) {
unscheduled_tasks.emplace(*it);
}
}
return true;
}
else
{
return false;
}
// TODO: reset received count!
// TODO reschedule!!!
}
// sender is the one who detected that this simu needs to be killed.
void send_kill_simu_message(int simu_id, int sender) {
if (comm_rank == 0)
{
int num_receiver = comm_size - 1;
if (sender != comm_rank) {
num_receiver--;
}
MPI_Request requests[num_receiver];
int index_receiver = 0;
for (int rank = 1; rank < comm_size; rank++)
{
if (rank != sender) {
// The sender already knows it.
MPI_Isend(&simu_id, 1, MPI_INT, rank, TAG_REMOVE_RUNNER, MPI_COMM_WORLD, &requests[index_receiver]);
index_receiver++;
}
}
// wait until all server procs know that this simulation died.
for (int i = 0; i < num_receiver; ++i)
{
MPI_Wait(&requests[i], MPI_STATUS_IGNORE);
}
} else {
MPI_Send(&simu_id, 1, MPI_INT, 0, TAG_REMOVE_RUNNER, MPI_COMM_WORLD);
}
}
void check_kill_simu_message() {
if (comm_rank == 0) {
// rank 0 checks if somebody of the others sent a remove thing. if yes forward it to all the OTHER ranks that do not know it yet.
for (int rank = 1; rank < comm_size; rank++)
{
int received = 0;
MPI_Iprobe(rank, TAG_REMOVE_RUNNER, MPI_COMM_WORLD, &received, MPI_STATUS_IGNORE);
if (received) {
int simu_id;
MPI_Recv(&simu_id, 1, MPI_INT, 0, TAG_REMOVE_RUNNER, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
if (kill_simu(simu_id)) {
// if I did not know yet that this simulation was to kill:
send_kill_simu_message(simu_id, rank);
}
}
}
} else {
// only check if rank 0 sent a kill s imulation message.
int received = 0;
MPI_Iprobe(0, TAG_REMOVE_RUNNER, MPI_COMM_WORLD, &received, MPI_STATUS_IGNORE);
if (received) {
int simu_id;
MPI_Recv(&simu_id, 1, MPI_INT, 0, TAG_REMOVE_RUNNER, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
kill_simu(simu_id);
}
}
}
int main(int argc, char * argv[])
{
assert(MAX_TIMESTAMP > 1);
......@@ -769,6 +894,12 @@ int main(int argc, char * argv[])
}
}
// Check for remove simulation message:
check_kill_simu_messag();
// TODO: if receiving message to shut down simulation: kill simulation (mpi probe for it...)
// TODO: check if we need to kill some simulations...
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment