Commit c78b6b2d authored by PERE Alexandre's avatar PERE Alexandre

Merge branch '34-add-stdout-stderr-and-ecode-to-scheduler-message' into 'develop'

Resolve "Add stdout, stderr and ecode to scheduler message"

See merge request !23
parents 509fd731 57514cb3
...@@ -36,6 +36,7 @@ use tracing_futures::Instrument; ...@@ -36,6 +36,7 @@ use tracing_futures::Instrument;
use std::os::unix::process::CommandExt; use std::os::unix::process::CommandExt;
use libc::{signal, SIGINT, SIG_IGN}; use libc::{signal, SIGINT, SIG_IGN};
use std::process::Stdio; use std::process::Stdio;
use std::path::PathBuf;
//----------------------------------------------------------------------------------------- MESSAGES //----------------------------------------------------------------------------------------- MESSAGES
...@@ -46,11 +47,12 @@ use std::process::Stdio; ...@@ -46,11 +47,12 @@ use std::process::Stdio;
/// This enumeration represents the different request messages that can be sent to the command. Those /// This enumeration represents the different request messages that can be sent to the command. Those
/// requests will be serialized to the following jsons when sent to the command stdin: /// requests will be serialized to the following jsons when sent to the command stdin:
pub enum RequestMessages{ pub enum RequestMessages{
/// Example of json transcript: `{"GET_PARAMETERS_REQUEST": {}}` /// Example of json transcript: `{"GET_PARAMETERS_REQUEST": {"uuid": "kkkagr23451"}}`
GetParametersRequest{}, GetParametersRequest{ uuid: String },
/// Example of json transcript: `{"RECORD_OUTPUT_REQUEST": {"parametes": "some params", /// Example of json transcript: `{"RECORD_OUTPUT_REQUEST": {"uuid": "kkkagr23451", "parametes":
/// "some params", "stdout": "some mess", "stderr": "some mess", "ecode": 0, "path": "/home",
/// "features": "[0.5, 0.5]"} }` /// "features": "[0.5, 0.5]"} }`
RecordOutputRequest{ parameters: String, features: String}, RecordOutputRequest{ uuid: String, parameters: String, stdout: String, stderr: String, ecode: i32, features: String, path: String},
// Example of json transcript: `{"SHUTDOWN_REQUEST": {}}` // Example of json transcript: `{"SHUTDOWN_REQUEST": {}}`
ShutdownRequest{}, ShutdownRequest{},
} }
...@@ -76,7 +78,7 @@ pub enum ResponseMessages{ ...@@ -76,7 +78,7 @@ pub enum ResponseMessages{
//------------------------------------------------------------------------------------------- MACROS //------------------------------------------------------------------------------------------- MACROS
/// This macro allows to send a particular request to the scheduler, and retrieve the output /// This macro allows to send a particular request to the scheduler, and retrieve the output.
#[macro_export] #[macro_export]
macro_rules! query_command { macro_rules! query_command {
($sched: expr, $req: expr ) => { ($sched: expr, $req: expr ) => {
...@@ -220,7 +222,7 @@ impl Scheduler { ...@@ -220,7 +222,7 @@ impl Scheduler {
/// Inner future containing the logic to request parameters. /// Inner future containing the logic to request parameters.
#[instrument(name="Scheduler::request_parameters", skip(sched))] #[instrument(name="Scheduler::request_parameters", skip(sched))]
async fn request_parameters(sched: Arc<Mutex<Scheduler>>) -> Result<String, Error> { async fn request_parameters(sched: Arc<Mutex<Scheduler>>, uuid: String) -> Result<String, Error> {
trace!("Requesting parameters"); trace!("Requesting parameters");
loop{ loop{
let response = { let response = {
...@@ -236,7 +238,7 @@ impl Scheduler { ...@@ -236,7 +238,7 @@ impl Scheduler {
} }
// We query the command // We query the command
let request = RequestMessages::GetParametersRequest{}; let request = RequestMessages::GetParametersRequest{uuid: uuid.clone()};
query_command!(sched, &request)? query_command!(sched, &request)?
}; };
...@@ -253,7 +255,9 @@ impl Scheduler { ...@@ -253,7 +255,9 @@ impl Scheduler {
/// Inner future containing the logic to record an output. /// Inner future containing the logic to record an output.
#[instrument(name="Scheduler::record_output", skip(sched))] #[instrument(name="Scheduler::record_output", skip(sched))]
async fn record_output(sched: Arc<Mutex<Scheduler>>, parameters: String, features: String) -> Result<(), Error>{ async fn record_output(sched: Arc<Mutex<Scheduler>>, uuid: String, parameters: String,
stdout: String, stderr: String, ecode: i32, features: String, path: String)
-> Result<(), Error>{
trace!("Recording output"); trace!("Recording output");
{ {
// We bind the command to this scope. Such that if one of the io blocks, we are sure that // We bind the command to this scope. Such that if one of the io blocks, we are sure that
...@@ -271,7 +275,7 @@ impl Scheduler { ...@@ -271,7 +275,7 @@ impl Scheduler {
} }
// We query the command // We query the command
let request = RequestMessages::RecordOutputRequest{parameters, features}; let request = RequestMessages::RecordOutputRequest{uuid, parameters, stdout, stderr, ecode, features, path};
match query_command!(sched, &request)?{ match query_command!(sched, &request)?{
ResponseMessages::RecordOutputResponse{} => Ok(()), ResponseMessages::RecordOutputResponse{} => Ok(()),
m => Err(Error::Message(format!("Unexpected message received {:?}", m))) m => Err(Error::Message(format!("Unexpected message received {:?}", m)))
...@@ -323,8 +327,8 @@ impl Scheduler { ...@@ -323,8 +327,8 @@ impl Scheduler {
/// Messages sent by the outer future to the resource inner thread, so as to start an operation. /// Messages sent by the outer future to the resource inner thread, so as to start an operation.
/// This contains the input of the operation if any. /// This contains the input of the operation if any.
enum OperationInput{ enum OperationInput{
RequestParameters, RequestParameters(String),
RecordOutput(String, String), RecordOutput(String, String, String, String, i32, String, String),
Shutdown, Shutdown,
} }
...@@ -396,9 +400,9 @@ impl SchedulerHandle { ...@@ -396,9 +400,9 @@ impl SchedulerHandle {
let _guard = span.enter(); let _guard = span.enter();
trace!(?operation, "Received operation"); trace!(?operation, "Received operation");
match operation { match operation {
OperationInput::RequestParameters => { OperationInput::RequestParameters(uuid) => {
spawner.spawn_local( spawner.spawn_local(
Scheduler::request_parameters(res.clone()) Scheduler::request_parameters(res.clone(), uuid)
.map(|a| { .map(|a| {
sender.send(OperationOutput::RequestParameters(a)) sender.send(OperationOutput::RequestParameters(a))
.map_err(|e| error!("Failed to \\ .map_err(|e| error!("Failed to \\
...@@ -408,9 +412,9 @@ impl SchedulerHandle { ...@@ -408,9 +412,9 @@ impl SchedulerHandle {
.instrument(span.clone()) .instrument(span.clone())
) )
} }
OperationInput::RecordOutput(parameters, features) => { OperationInput::RecordOutput(uuid, parameters, stdout, stderr, ecode, features, path) => {
spawner.spawn_local( spawner.spawn_local(
Scheduler::record_output(res.clone(), parameters, features) Scheduler::record_output(res.clone(), uuid, parameters, stdout, stderr, ecode, features, path)
.map(|a| { .map(|a| {
sender.send(OperationOutput::RecordOutput(a)) sender.send(OperationOutput::RecordOutput(a))
.map_err(|e| error!("Failed to \\ .map_err(|e| error!("Failed to \\
...@@ -473,12 +477,12 @@ impl SchedulerHandle { ...@@ -473,12 +477,12 @@ impl SchedulerHandle {
/// Async method, which request a parameter string from the scheduler, and wait for it if the /// Async method, which request a parameter string from the scheduler, and wait for it if the
/// scheduler is not yet ready. /// scheduler is not yet ready.
pub fn async_request_parameters(&self) -> impl Future<Output=Result<String,Error>> { pub fn async_request_parameters(&self, uuid: String) -> impl Future<Output=Result<String,Error>> {
let mut chan = self._sender.clone(); let mut chan = self._sender.clone();
async move { async move {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
trace!("Sending async request parameters input"); trace!("Sending async request parameters input");
chan.send((sender, OperationInput::RequestParameters)) chan.send((sender, OperationInput::RequestParameters(uuid)))
.await .await
.map_err(|e| Error::Channel(e.to_string()))?; .map_err(|e| Error::Channel(e.to_string()))?;
trace!("Awaiting async request parameters output"); trace!("Awaiting async request parameters output");
...@@ -491,12 +495,14 @@ impl SchedulerHandle { ...@@ -491,12 +495,14 @@ impl SchedulerHandle {
} }
/// Async method, returning a future that ultimately resolves after the output was recorded. /// Async method, returning a future that ultimately resolves after the output was recorded.
pub fn async_record_output(&self, parameters: String, output: String) -> impl Future<Output=Result<(),Error>> { pub fn async_record_output(&self, uuid: String, parameters: String, stdout: String,
stderr: String, ecode: i32, features: String, path: String)
-> impl Future<Output=Result<(),Error>> {
let mut chan = self._sender.clone(); let mut chan = self._sender.clone();
async move { async move {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
trace!("Sending async record output input"); trace!("Sending async record output input");
chan.send((sender, OperationInput::RecordOutput(parameters, output))) chan.send((sender, OperationInput::RecordOutput(uuid, parameters, stdout, stderr, ecode, features, path)))
.await .await
.map_err(|e| Error::Channel(e.to_string()))?; .map_err(|e| Error::Channel(e.to_string()))?;
trace!("Awaiting async record output output"); trace!("Awaiting async record output output");
...@@ -611,7 +617,11 @@ if __name__ == \"__main__\": ...@@ -611,7 +617,11 @@ if __name__ == \"__main__\":
elif \"RECORD_OUTPUT_REQUEST\" in inpt.keys(): elif \"RECORD_OUTPUT_REQUEST\" in inpt.keys():
sys.stderr.write(f\"Python received RECORD_OUTPUT_REQUEST {inpt}\\n\") sys.stderr.write(f\"Python received RECORD_OUTPUT_REQUEST {inpt}\\n\")
if inpt[\"RECORD_OUTPUT_REQUEST\"][\"parameters\"] != \"params_from_rust\": raise Exception() if inpt[\"RECORD_OUTPUT_REQUEST\"][\"parameters\"] != \"params_from_rust\": raise Exception()
if inpt[\"RECORD_OUTPUT_REQUEST\"][\"stdout\"] != \"stdout\": raise Exception()
if inpt[\"RECORD_OUTPUT_REQUEST\"][\"stderr\"] != \"stderr\": raise Exception()
if inpt[\"RECORD_OUTPUT_REQUEST\"][\"ecode\"] != 0: raise Exception()
if inpt[\"RECORD_OUTPUT_REQUEST\"][\"features\"] != '1.5': raise Exception() if inpt[\"RECORD_OUTPUT_REQUEST\"][\"features\"] != '1.5': raise Exception()
if inpt[\"RECORD_OUTPUT_REQUEST\"][\"path\"] != \".\": raise Exception()
sys.stdout.write(json.dumps({\"RECORD_OUTPUT_RESPONSE\": {}})) sys.stdout.write(json.dumps({\"RECORD_OUTPUT_RESPONSE\": {}}))
sys.stdout.write(\"\\n\") sys.stdout.write(\"\\n\")
elif \"SHUTDOWN_REQUEST\" in inpt.keys(): elif \"SHUTDOWN_REQUEST\" in inpt.keys():
...@@ -643,6 +653,10 @@ if __name__ == \"__main__\": ...@@ -643,6 +653,10 @@ if __name__ == \"__main__\":
elif \"RECORD_OUTPUT_REQUEST\" in inpt.keys(): elif \"RECORD_OUTPUT_REQUEST\" in inpt.keys():
sys.stderr.write(\"Python received RECORD_OUTPUT_REQUEST\\n\") sys.stderr.write(\"Python received RECORD_OUTPUT_REQUEST\\n\")
if inpt[\"RECORD_OUTPUT_REQUEST\"][\"parameters\"] != \"params_from_rust\": raise Exception() if inpt[\"RECORD_OUTPUT_REQUEST\"][\"parameters\"] != \"params_from_rust\": raise Exception()
if inpt[\"RECORD_OUTPUT_REQUEST\"][\"stdout\"] != \"stdout\": raise Exception()
if inpt[\"RECORD_OUTPUT_REQUEST\"][\"stderr\"] != \"stderr\": raise Exception()
if inpt[\"RECORD_OUTPUT_REQUEST\"][\"ecode\"] != 0: raise Exception()
if inpt[\"RECORD_OUTPUT_REQUEST\"][\"path\"] != \".\": raise Exception()
if inpt[\"RECORD_OUTPUT_REQUEST\"][\"features\"] != '1.5': raise Exception() if inpt[\"RECORD_OUTPUT_REQUEST\"][\"features\"] != '1.5': raise Exception()
print(json.dumps({\"ERROR_RESPONSE\": {\"message\": \"error_from_python\"}})) print(json.dumps({\"ERROR_RESPONSE\": {\"message\": \"error_from_python\"}}))
elif \"SHUTDOWN_REQUEST\" in inpt.keys(): elif \"SHUTDOWN_REQUEST\" in inpt.keys():
...@@ -673,10 +687,10 @@ if __name__ == \"__main__\": ...@@ -673,10 +687,10 @@ if __name__ == \"__main__\":
command.stderr(std::process::Stdio::inherit()); command.stderr(std::process::Stdio::inherit());
let scheduler = SchedulerHandle::spawn(command, "scheduler.py".into()).unwrap(); let scheduler = SchedulerHandle::spawn(command, "scheduler.py".into()).unwrap();
let parameters = block_on(scheduler.async_request_parameters()).unwrap(); let parameters = block_on(scheduler.async_request_parameters("hhh".into())).unwrap();
assert_eq!(parameters, format!("params_from_python")); assert_eq!(parameters, format!("params_from_python"));
block_on(scheduler.async_record_output("params_from_rust".into(), "1.5".into())).unwrap(); block_on(scheduler.async_record_output("hhh".into(), "params_from_rust".into(), "stdout".into(), "stderr".into(), 0, "1.5".into(), ".".into())).unwrap();
drop(scheduler); drop(scheduler);
...@@ -688,11 +702,9 @@ if __name__ == \"__main__\": ...@@ -688,11 +702,9 @@ if __name__ == \"__main__\":
command.stderr(std::process::Stdio::inherit()); command.stderr(std::process::Stdio::inherit());
let scheduler = SchedulerHandle::spawn(command, "scheduler.py".into()).unwrap(); let scheduler = SchedulerHandle::spawn(command, "scheduler.py".into()).unwrap();
block_on(scheduler.async_request_parameters()).unwrap_err(); block_on(scheduler.async_request_parameters("hhh".into())).unwrap_err();
block_on(scheduler.async_record_output("params_from_rust".into(), "1.5".into())).unwrap_err(); block_on(scheduler.async_record_output("hhh".into(), "params_from_rust".into(), "stdout".into(), "stderr".into(), 0, "1.5".into(), ".".into())).unwrap_err();
} }
......
...@@ -239,7 +239,8 @@ fn main(){ ...@@ -239,7 +239,8 @@ fn main(){
.help("File name of the script to be executed") .help("File name of the script to be executed")
.required(true)) .required(true))
.arg(clap::Arg::with_name("SCHEDULER") .arg(clap::Arg::with_name("SCHEDULER")
.help("Search command to use to schedule experiment parameters.")) .help("Search command to use to schedule experiment parameters.")
.required(true))
.arg(clap::Arg::with_name("verbose") .arg(clap::Arg::with_name("verbose")
.short("V") .short("V")
.long("verbose") .long("verbose")
......
...@@ -180,10 +180,16 @@ pub fn sched(matches: clap::ArgMatches<'static>) -> Result<Exit, Exit>{ ...@@ -180,10 +180,16 @@ pub fn sched(matches: clap::ArgMatches<'static>) -> Result<Exit, Exit>{
// Again, we make local copies. // Again, we make local copies.
let sched = sched.clone(); let sched = sched.clone();
let host = host.clone(); let host = host.clone();
let mut store = store;
// We generate an uuid
let id = uuid::Uuid::new_v4().hyphenated().to_string();
push_env(&mut store, "RUNAWAY_UUID", id.clone());
debug!("Execution id set to {}", format!("{}", id));
// We get the arguments // We get the arguments
info!("Querying the scheduler"); info!("Querying the scheduler");
let arguments: String = match sched.async_request_parameters().await{ let arguments: String = match sched.async_request_parameters(id.clone()).await{
Ok(arg) => Ok(arg), Ok(arg) => Ok(arg),
Err(liborchestra::scheduler::Error::Crashed) => Err(Exit::SchedulerCrashed), Err(liborchestra::scheduler::Error::Crashed) => Err(Exit::SchedulerCrashed),
Err(liborchestra::scheduler::Error::Shutdown) => Err(Exit::SchedulerShutdown), Err(liborchestra::scheduler::Error::Shutdown) => Err(Exit::SchedulerShutdown),
...@@ -193,18 +199,18 @@ pub fn sched(matches: clap::ArgMatches<'static>) -> Result<Exit, Exit>{ ...@@ -193,18 +199,18 @@ pub fn sched(matches: clap::ArgMatches<'static>) -> Result<Exit, Exit>{
// We acquire the node // We acquire the node
let node = to_exit!(host.async_acquire().await, Exit::NodeAcquisition)?; let node = to_exit!(host.async_acquire().await, Exit::NodeAcquisition)?;
let mut store = store;
store.extend(node.context.envs.clone().into_iter()); store.extend(node.context.envs.clone().into_iter());
debug!("Acquired node with context: \nCwd: {}\nEnvs:\n {}", debug!("Acquired node with context: \nCwd: {}\nEnvs:\n {}",
node.context.cwd.0.to_str().unwrap(), node.context.cwd.0.to_str().unwrap(),
format_env(&node.context.envs).replace("\n", "\n ") format_env(&node.context.envs).replace("\n", "\n ")
); );
Ok((arguments, node, store)) Ok((arguments, node, store, id))
}; };
// We execute this future and breaks if an error was encountered // We execute this future and breaks if an error was encountered
let (arguments, node, store) = match executor.run(arg_and_node_and_store_fut){ let (arguments, node, store, id) = match executor.run(arg_and_node_and_store_fut){
Ok(a) => a, Ok(a) => a,
Err(e) => break e Err(e) => break e
}; };
...@@ -213,7 +219,7 @@ pub fn sched(matches: clap::ArgMatches<'static>) -> Result<Exit, Exit>{ ...@@ -213,7 +219,7 @@ pub fn sched(matches: clap::ArgMatches<'static>) -> Result<Exit, Exit>{
let perform_fut = async move { let perform_fut = async move {
info!("Starting execution with arguments\"{}\"", arguments); info!("Starting execution with arguments\"{}\"", arguments);
// We perform the exec // We perform the exec
let (local_fetch_archive, store, remote_fetch_hash, execution_code) = perform_on_node( let (local_fetch_archive, store, remote_fetch_hash, output) = perform_on_node(
store, store,
node, node,
&host, &host,
...@@ -226,13 +232,21 @@ pub fn sched(matches: clap::ArgMatches<'static>) -> Result<Exit, Exit>{ ...@@ -226,13 +232,21 @@ pub fn sched(matches: clap::ArgMatches<'static>) -> Result<Exit, Exit>{
&fetch_include_globs, &fetch_include_globs,
matches.is_present("on-local"), matches.is_present("on-local"),
).await?; ).await?;
let ret = unpacks_fetch_post_proc(&matches, local_fetch_archive, store.clone(), remote_fetch_hash, execution_code); let ret = unpacks_fetch_post_proc(&matches, local_fetch_archive.clone(), store.clone(), remote_fetch_hash, output.ecode);
if let Some(EnvironmentValue(features)) = store.get(&EnvironmentKey("RUNAWAY_FEATURES".into())) { let features = match store.get(&EnvironmentKey("RUNAWAY_FEATURES".into())){
to_exit!(sched.async_record_output(arguments, features.to_string()).await, Exit::RecordFeatures)?; Some(EnvironmentValue(features)) => features.to_string(),
} else { None => "".to_owned()
error!("RUNAWAY_FEATURES was not set."); };
return Err(Exit::FeaturesNotSet); let path = local_fetch_archive
} .parent()
.unwrap()
.canonicalize()
.unwrap()
.to_str()
.unwrap()
.to_owned();
to_exit!(sched.async_record_output(id, arguments, output.stdout, output.stderr, output.ecode, features, path).await,
Exit::RecordFeatures)?;
ret ret
}; };
...@@ -375,19 +389,13 @@ async fn perform_on_node(store: EnvironmentStore, ...@@ -375,19 +389,13 @@ async fn perform_on_node(store: EnvironmentStore,
fetch_ignore_globs: &Vec<Glob<String>>, fetch_ignore_globs: &Vec<Glob<String>>,
fetch_include_globs: &Vec<Glob<String>>, fetch_include_globs: &Vec<Glob<String>>,
on_local: bool on_local: bool
) -> Result<(PathBuf, EnvironmentStore, Sha1Hash, i32), Exit>{ ) -> Result<(PathBuf, EnvironmentStore, Sha1Hash, OutputBuf), Exit>{
let mut store = store; let mut store = store;
push_env(&mut store, "RUNAWAY_ARGUMENTS", arguments); push_env(&mut store, "RUNAWAY_ARGUMENTS", arguments);
// We generate an uuid
let id = uuid::Uuid::new_v4().hyphenated().to_string();
push_env(&mut store, "RUNAWAY_UUID", id.clone());
debug!("Execution id set to {}", format!("{}", id));
// We generate the remote folder and unpack data into it // We generate the remote folder and unpack data into it
let remote_folder; let remote_folder;
if on_local{ if on_local{
...@@ -403,6 +411,8 @@ async fn perform_on_node(store: EnvironmentStore, ...@@ -403,6 +411,8 @@ async fn perform_on_node(store: EnvironmentStore,
&node &node
).await?; ).await?;
// We retrieve the id
let EnvironmentValue(id) = store.get(&EnvironmentKey("RUNAWAY_UUID".into())).unwrap().to_owned();
// We perform the job // We perform the job
debug!("Executing script"); debug!("Executing script");
...@@ -514,7 +524,7 @@ async fn perform_on_node(store: EnvironmentStore, ...@@ -514,7 +524,7 @@ async fn perform_on_node(store: EnvironmentStore,
// We return needed informations // We return needed informations
Ok((local_fetch_archive, execution_context.envs, remote_fetch_hash, out.ecode)) Ok((local_fetch_archive, execution_context.envs, remote_fetch_hash, out))
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment