Commit 86daf6a2 authored by S. Lackner's avatar S. Lackner

[code] allow profiles to set FAILED as return code to abort jobs

parent be859270
......@@ -347,6 +347,9 @@ string job_state_to_string(JobState state)
case JobState::JOB_STATE_COMPLETED_SUCCESSFULLY:
job_state = "COMPLETED_SUCCESSFULLY";
break;
case JobState::JOB_STATE_COMPLETED_FAILED:
job_state = "COMPLETED_FAILED";
break;
case JobState::JOB_STATE_COMPLETED_KILLED:
job_state = "COMPLETED_KILLED";
break;
......@@ -368,6 +371,8 @@ JobState job_state_from_string(string state)
new_state = JobState::JOB_STATE_RUNNING;
} else if (state == "COMPLETED_SUCCESSFULLY") {
new_state = JobState::JOB_STATE_COMPLETED_SUCCESSFULLY;
} else if (state == "COMPLETED_FAILED") {
new_state = JobState::JOB_STATE_COMPLETED_FAILED;
} else if (state == "COMPLETED_KILLED") {
new_state = JobState::JOB_STATE_COMPLETED_KILLED;
} else if (state == "REJECTED") {
......
......@@ -26,7 +26,8 @@ enum class JobState
JOB_STATE_NOT_SUBMITTED //!< The job exists but cannot be scheduled yet.
,JOB_STATE_SUBMITTED //!< The job has been submitted, it can now be scheduled.
,JOB_STATE_RUNNING //!< The job has been scheduled and is currently being processed.
,JOB_STATE_COMPLETED_SUCCESSFULLY //!< The job execution finished before its walltime.
,JOB_STATE_COMPLETED_SUCCESSFULLY //!< The job execution finished before its walltime successfully.
,JOB_STATE_COMPLETED_FAILED //!< The job execution finished before its walltime but the job failed.
,JOB_STATE_COMPLETED_KILLED //!< The job has been killed.
,JOB_STATE_REJECTED //!< The job has been rejected by the scheduler.
};
......@@ -60,6 +61,7 @@ struct Job
MachineRange allocation; //!< The machines on which the job has been executed.
std::vector<int> smpi_ranks_to_hosts_mapping; //!< If the job uses a SMPI profile, stores which host number each MPI rank should use. These numbers must be in [0,required_nb_res[.
JobState state; //!< The current state of the job
int return_code = -1; //!< The return code of the job
/**
* @brief Creates a new-allocated Job from a JSON description
......
......@@ -256,11 +256,11 @@ int execute_profile(BatsimContext *context,
msg_error_t err = MSG_parallel_task_execute_with_timeout(ptask, *remaining_time);
*remaining_time = *remaining_time - (MSG_get_clock() - time_before_execute);
int ret = 1;
int ret = profile->return_code;
if (err == MSG_OK) {}
else if (err == MSG_TIMEOUT)
{
ret = 0;
ret = -1;
}
else
{
......@@ -283,14 +283,15 @@ int execute_profile(BatsimContext *context,
{
for (unsigned int j = 0; j < data->sequence.size(); j++)
{
if (execute_profile(context, data->sequence[j], allocation,
cleanup_data, remaining_time) == 0)
int ret_last_profile = execute_profile(context, data->sequence[j], allocation,
cleanup_data, remaining_time);
if (ret_last_profile != 0)
{
return 0;
return ret_last_profile;
}
}
}
return 1;
return profile->return_code;
}
else if (profile->type == ProfileType::DELAY)
{
......@@ -302,7 +303,7 @@ int execute_profile(BatsimContext *context,
MSG_process_sleep(data->delay);
XBT_INFO("Sleeping done");
*remaining_time = *remaining_time - data->delay;
return 1;
return profile->return_code;
}
else
{
......@@ -310,7 +311,7 @@ int execute_profile(BatsimContext *context,
MSG_process_sleep(*remaining_time);
XBT_INFO("Walltime reached");
*remaining_time = 0;
return 0;
return -1;
}
}
else if (profile->type == ProfileType::SMPI)
......@@ -382,13 +383,13 @@ int execute_profile(BatsimContext *context,
}
MSG_sem_acquire(sem);
free(sem);
return 1;
return profile->return_code;
}
else
xbt_die("Cannot execute job %s: the profile type '%s' is unknown",
job->id.c_str(), job->profile.c_str());
return 0;
return 1;
}
int execute_job_process(int argc, char *argv[])
......@@ -420,11 +421,17 @@ int execute_job_process(int argc, char *argv[])
CleanExecuteProfileData * cleanup_data = new CleanExecuteProfileData;
cleanup_data->exec_process_args = args;
SIMIX_process_on_exit(MSG_process_self(), execute_profile_cleanup, cleanup_data);
if (execute_profile(args->context, job->profile, args->allocation, cleanup_data, &remaining_time) == 1)
job->return_code = execute_profile(args->context, job->profile, args->allocation, cleanup_data, &remaining_time);
if (job->return_code == 0)
{
XBT_INFO("Job %s finished in time", job->id.c_str());
XBT_INFO("Job %s finished in time (success)", job->id.c_str());
job->state = JobState::JOB_STATE_COMPLETED_SUCCESSFULLY;
}
else if (job->return_code > 0)
{
XBT_INFO("Job %s finished in time (failed)", job->id.c_str());
job->state = JobState::JOB_STATE_COMPLETED_FAILED;
}
else
{
XBT_INFO("Job %s had been killed (walltime %g reached)",
......@@ -553,7 +560,8 @@ int killer_process(int argc, char *argv[])
xbt_assert(job->state == JobState::JOB_STATE_RUNNING ||
job->state == JobState::JOB_STATE_COMPLETED_KILLED ||
job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY,
job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY ||
job->state == JobState::JOB_STATE_COMPLETED_FAILED,
"Bad kill: job %s is not running", job->id.c_str());
if (job->state == JobState::JOB_STATE_RUNNING)
......
......@@ -220,6 +220,13 @@ Profile *Profile::from_json(const std::string & profile_name,
error_prefix.c_str(), profile_name.c_str());
string profile_type = json_desc["type"].GetString();
int return_code = 0;
if (json_desc.HasMember("ret")) {
return_code = json_desc["ret"].GetInt();
}
profile->return_code = return_code;
if (profile_type == "delay")
{
profile->type = ProfileType::DELAY;
......
......@@ -38,6 +38,7 @@ struct Profile
ProfileType type; //!< The type of the profile
void * data; //!< The associated data
std::string json_description; //!< The JSON description of the profile
int return_code = 0; //!< The return code of this profile's execution (SUCCESS == 0)
/**
* @brief Creates a new-allocated Profile from a JSON description
......
......@@ -195,6 +195,7 @@ void JsonProtocolWriter::append_job_completed(const string & job_id,
const string & job_status,
const string & job_state,
const string & kill_reason,
int return_code,
double date)
{
/* {
......@@ -213,6 +214,7 @@ void JsonProtocolWriter::append_job_completed(const string & job_id,
data.AddMember("job_id", Value().SetString(job_id.c_str(), _alloc), _alloc);
data.AddMember("status", Value().SetString(job_status.c_str(), _alloc), _alloc);
data.AddMember("job_state", Value().SetString(job_state.c_str(), _alloc), _alloc);
data.AddMember("return_code", Value().SetInt(return_code), _alloc);
data.AddMember("kill_reason", Value().SetString(kill_reason.c_str(), _alloc), _alloc);
Value event(rapidjson::kObjectType);
......
......@@ -110,6 +110,7 @@ public:
const std::string & job_status,
const std::string & job_state,
const std::string & kill_reason,
int return_code,
double date) = 0;
/**
......@@ -229,6 +230,7 @@ public:
const std::string & job_status,
const std::string & job_state,
const std::string & kill_reason,
int return_code,
double date);
/**
......@@ -298,7 +300,7 @@ private:
rapidjson::Document _doc; //!< A rapidjson document
rapidjson::Document::AllocatorType & _alloc; //!< The allocated of _doc
rapidjson::Value _events = rapidjson::Value(rapidjson::kArrayType); //!< A rapidjson array in which the events are pushed
const std::vector<std::string> accepted_completion_statuses = {"SUCCESS", "TIMEOUT"}; //!< The list of accepted statuses for the JOB_COMPLETED message
const std::vector<std::string> accepted_completion_statuses = {"SUCCESS", "FAILED", "TIMEOUT"}; //!< The list of accepted statuses for the JOB_COMPLETED message
};
......
......@@ -209,6 +209,10 @@ void server_on_job_completed(ServerData * data,
{
status = "SUCCESS";
}
else if (job->state == JobState::JOB_STATE_COMPLETED_FAILED)
{
status = "FAILED";
}
else if (job->state == JobState::JOB_STATE_COMPLETED_KILLED && job->kill_reason == "Walltime reached")
{
status = "TIMEOUT";
......@@ -217,6 +221,7 @@ void server_on_job_completed(ServerData * data,
data->context->proto_writer->append_job_completed(message->job_id.to_string(),
status, job_state_to_string(job->state),
job->kill_reason,
job->return_code,
MSG_get_clock());
check_submitted_and_completed(data);
......@@ -645,6 +650,7 @@ void server_on_change_job_state(ServerData * data,
case JobState::JOB_STATE_RUNNING:
switch (new_state) {
case JobState::JOB_STATE_COMPLETED_SUCCESSFULLY:
case JobState::JOB_STATE_COMPLETED_FAILED:
case JobState::JOB_STATE_COMPLETED_KILLED:
job->runtime = MSG_get_clock() - job->starting_time;
data->nb_running_jobs--;
......@@ -653,7 +659,7 @@ void server_on_change_job_state(ServerData * data,
xbt_assert(data->nb_completed_jobs + data->nb_running_jobs <= data->nb_submitted_jobs);
break;
default:
xbt_assert(false, "Can only change the state of a running job to completed (successfully and killed)");
xbt_assert(false, "Can only change the state of a running job to completed (successfully, failed, and killed)");
}
break;
default:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment