diff --git a/liborchestra/src/lib/scheduler.rs b/liborchestra/src/lib/scheduler.rs index a4807a056ceabf55b6450f245e0bd031dafdb871..1185c334d1fd390aedcd355144de86d2793b5903 100644 --- a/liborchestra/src/lib/scheduler.rs +++ b/liborchestra/src/lib/scheduler.rs @@ -36,6 +36,7 @@ use tracing_futures::Instrument; use std::os::unix::process::CommandExt; use libc::{signal, SIGINT, SIG_IGN}; use std::process::Stdio; +use std::path::PathBuf; //----------------------------------------------------------------------------------------- MESSAGES @@ -46,11 +47,12 @@ use std::process::Stdio; /// This enumeration represents the different request messages that can be sent to the command. Those /// requests will be serialized to the following jsons when sent to the command stdin: pub enum RequestMessages{ - /// Example of json transcript: `{"GET_PARAMETERS_REQUEST": {}}` - GetParametersRequest{}, - /// Example of json transcript: `{"RECORD_OUTPUT_REQUEST": {"parametes": "some params", + /// Example of json transcript: `{"GET_PARAMETERS_REQUEST": {"uuid": "kkkagr23451"}}` + GetParametersRequest{ uuid: String }, + /// Example of json transcript: `{"RECORD_OUTPUT_REQUEST": {"uuid": "kkkagr23451", "parametes": + /// "some params", "stdout": "some mess", "stderr": "some mess", "ecode": 0, "path": "/home", /// "features": "[0.5, 0.5]"} }` - RecordOutputRequest{ parameters: String, features: String}, + RecordOutputRequest{ uuid: String, parameters: String, stdout: String, stderr: String, ecode: i32, features: String, path: String}, // Example of json transcript: `{"SHUTDOWN_REQUEST": {}}` ShutdownRequest{}, } @@ -76,7 +78,7 @@ pub enum ResponseMessages{ //------------------------------------------------------------------------------------------- MACROS -/// This macro allows to send a particular request to the scheduler, and retrieve the output +/// This macro allows to send a particular request to the scheduler, and retrieve the output. #[macro_export] macro_rules! query_command { ($sched: expr, $req: expr ) => { @@ -220,7 +222,7 @@ impl Scheduler { /// Inner future containing the logic to request parameters. #[instrument(name="Scheduler::request_parameters", skip(sched))] - async fn request_parameters(sched: Arc>) -> Result { + async fn request_parameters(sched: Arc>, uuid: String) -> Result { trace!("Requesting parameters"); loop{ let response = { @@ -236,7 +238,7 @@ impl Scheduler { } // We query the command - let request = RequestMessages::GetParametersRequest{}; + let request = RequestMessages::GetParametersRequest{uuid: uuid.clone()}; query_command!(sched, &request)? }; @@ -253,7 +255,9 @@ impl Scheduler { /// Inner future containing the logic to record an output. #[instrument(name="Scheduler::record_output", skip(sched))] - async fn record_output(sched: Arc>, parameters: String, features: String) -> Result<(), Error>{ + async fn record_output(sched: Arc>, uuid: String, parameters: String, + stdout: String, stderr: String, ecode: i32, features: String, path: String) + -> Result<(), Error>{ trace!("Recording output"); { // We bind the command to this scope. Such that if one of the io blocks, we are sure that @@ -271,7 +275,7 @@ impl Scheduler { } // We query the command - let request = RequestMessages::RecordOutputRequest{parameters, features}; + let request = RequestMessages::RecordOutputRequest{uuid, parameters, stdout, stderr, ecode, features, path}; match query_command!(sched, &request)?{ ResponseMessages::RecordOutputResponse{} => Ok(()), m => Err(Error::Message(format!("Unexpected message received {:?}", m))) @@ -323,8 +327,8 @@ impl Scheduler { /// Messages sent by the outer future to the resource inner thread, so as to start an operation. /// This contains the input of the operation if any. enum OperationInput{ - RequestParameters, - RecordOutput(String, String), + RequestParameters(String), + RecordOutput(String, String, String, String, i32, String, String), Shutdown, } @@ -396,9 +400,9 @@ impl SchedulerHandle { let _guard = span.enter(); trace!(?operation, "Received operation"); match operation { - OperationInput::RequestParameters => { + OperationInput::RequestParameters(uuid) => { spawner.spawn_local( - Scheduler::request_parameters(res.clone()) + Scheduler::request_parameters(res.clone(), uuid) .map(|a| { sender.send(OperationOutput::RequestParameters(a)) .map_err(|e| error!("Failed to \\ @@ -408,9 +412,9 @@ impl SchedulerHandle { .instrument(span.clone()) ) } - OperationInput::RecordOutput(parameters, features) => { + OperationInput::RecordOutput(uuid, parameters, stdout, stderr, ecode, features, path) => { spawner.spawn_local( - Scheduler::record_output(res.clone(), parameters, features) + Scheduler::record_output(res.clone(), uuid, parameters, stdout, stderr, ecode, features, path) .map(|a| { sender.send(OperationOutput::RecordOutput(a)) .map_err(|e| error!("Failed to \\ @@ -473,12 +477,12 @@ impl SchedulerHandle { /// Async method, which request a parameter string from the scheduler, and wait for it if the /// scheduler is not yet ready. - pub fn async_request_parameters(&self) -> impl Future> { + pub fn async_request_parameters(&self, uuid: String) -> impl Future> { let mut chan = self._sender.clone(); async move { let (sender, receiver) = oneshot::channel(); trace!("Sending async request parameters input"); - chan.send((sender, OperationInput::RequestParameters)) + chan.send((sender, OperationInput::RequestParameters(uuid))) .await .map_err(|e| Error::Channel(e.to_string()))?; trace!("Awaiting async request parameters output"); @@ -491,12 +495,14 @@ impl SchedulerHandle { } /// Async method, returning a future that ultimately resolves after the output was recorded. - pub fn async_record_output(&self, parameters: String, output: String) -> impl Future> { + pub fn async_record_output(&self, uuid: String, parameters: String, stdout: String, + stderr: String, ecode: i32, features: String, path: String) + -> impl Future> { let mut chan = self._sender.clone(); async move { let (sender, receiver) = oneshot::channel(); trace!("Sending async record output input"); - chan.send((sender, OperationInput::RecordOutput(parameters, output))) + chan.send((sender, OperationInput::RecordOutput(uuid, parameters, stdout, stderr, ecode, features, path))) .await .map_err(|e| Error::Channel(e.to_string()))?; trace!("Awaiting async record output output"); @@ -611,7 +617,11 @@ if __name__ == \"__main__\": elif \"RECORD_OUTPUT_REQUEST\" in inpt.keys(): sys.stderr.write(f\"Python received RECORD_OUTPUT_REQUEST {inpt}\\n\") if inpt[\"RECORD_OUTPUT_REQUEST\"][\"parameters\"] != \"params_from_rust\": raise Exception() + if inpt[\"RECORD_OUTPUT_REQUEST\"][\"stdout\"] != \"stdout\": raise Exception() + if inpt[\"RECORD_OUTPUT_REQUEST\"][\"stderr\"] != \"stderr\": raise Exception() + if inpt[\"RECORD_OUTPUT_REQUEST\"][\"ecode\"] != 0: raise Exception() if inpt[\"RECORD_OUTPUT_REQUEST\"][\"features\"] != '1.5': raise Exception() + if inpt[\"RECORD_OUTPUT_REQUEST\"][\"path\"] != \".\": raise Exception() sys.stdout.write(json.dumps({\"RECORD_OUTPUT_RESPONSE\": {}})) sys.stdout.write(\"\\n\") elif \"SHUTDOWN_REQUEST\" in inpt.keys(): @@ -643,6 +653,10 @@ if __name__ == \"__main__\": elif \"RECORD_OUTPUT_REQUEST\" in inpt.keys(): sys.stderr.write(\"Python received RECORD_OUTPUT_REQUEST\\n\") if inpt[\"RECORD_OUTPUT_REQUEST\"][\"parameters\"] != \"params_from_rust\": raise Exception() + if inpt[\"RECORD_OUTPUT_REQUEST\"][\"stdout\"] != \"stdout\": raise Exception() + if inpt[\"RECORD_OUTPUT_REQUEST\"][\"stderr\"] != \"stderr\": raise Exception() + if inpt[\"RECORD_OUTPUT_REQUEST\"][\"ecode\"] != 0: raise Exception() + if inpt[\"RECORD_OUTPUT_REQUEST\"][\"path\"] != \".\": raise Exception() if inpt[\"RECORD_OUTPUT_REQUEST\"][\"features\"] != '1.5': raise Exception() print(json.dumps({\"ERROR_RESPONSE\": {\"message\": \"error_from_python\"}})) elif \"SHUTDOWN_REQUEST\" in inpt.keys(): @@ -673,10 +687,10 @@ if __name__ == \"__main__\": command.stderr(std::process::Stdio::inherit()); let scheduler = SchedulerHandle::spawn(command, "scheduler.py".into()).unwrap(); - let parameters = block_on(scheduler.async_request_parameters()).unwrap(); + let parameters = block_on(scheduler.async_request_parameters("hhh".into())).unwrap(); assert_eq!(parameters, format!("params_from_python")); - block_on(scheduler.async_record_output("params_from_rust".into(), "1.5".into())).unwrap(); + block_on(scheduler.async_record_output("hhh".into(), "params_from_rust".into(), "stdout".into(), "stderr".into(), 0, "1.5".into(), ".".into())).unwrap(); drop(scheduler); @@ -688,11 +702,9 @@ if __name__ == \"__main__\": command.stderr(std::process::Stdio::inherit()); let scheduler = SchedulerHandle::spawn(command, "scheduler.py".into()).unwrap(); - block_on(scheduler.async_request_parameters()).unwrap_err(); + block_on(scheduler.async_request_parameters("hhh".into())).unwrap_err(); - block_on(scheduler.async_record_output("params_from_rust".into(), "1.5".into())).unwrap_err(); - - + block_on(scheduler.async_record_output("hhh".into(), "params_from_rust".into(), "stdout".into(), "stderr".into(), 0, "1.5".into(), ".".into())).unwrap_err(); } diff --git a/runaway-cli/src/main.rs b/runaway-cli/src/main.rs index 49c5d6ff7fe674951e821b0cdd3339fd345144d6..15998d617ce3eff08da0a5473d597fd9ee44139c 100644 --- a/runaway-cli/src/main.rs +++ b/runaway-cli/src/main.rs @@ -239,7 +239,8 @@ fn main(){ .help("File name of the script to be executed") .required(true)) .arg(clap::Arg::with_name("SCHEDULER") - .help("Search command to use to schedule experiment parameters.")) + .help("Search command to use to schedule experiment parameters.") + .required(true)) .arg(clap::Arg::with_name("verbose") .short("V") .long("verbose") diff --git a/runaway-cli/src/subcommands/sched.rs b/runaway-cli/src/subcommands/sched.rs index 6aed550824c0e504b4a811125583d63cbe3bcda8..55dbdfd13115a349b64b48114da697148f5c95dd 100644 --- a/runaway-cli/src/subcommands/sched.rs +++ b/runaway-cli/src/subcommands/sched.rs @@ -180,10 +180,16 @@ pub fn sched(matches: clap::ArgMatches<'static>) -> Result{ // Again, we make local copies. let sched = sched.clone(); let host = host.clone(); + let mut store = store; + + // We generate an uuid + let id = uuid::Uuid::new_v4().hyphenated().to_string(); + push_env(&mut store, "RUNAWAY_UUID", id.clone()); + debug!("Execution id set to {}", format!("{}", id)); // We get the arguments info!("Querying the scheduler"); - let arguments: String = match sched.async_request_parameters().await{ + let arguments: String = match sched.async_request_parameters(id.clone()).await{ Ok(arg) => Ok(arg), Err(liborchestra::scheduler::Error::Crashed) => Err(Exit::SchedulerCrashed), Err(liborchestra::scheduler::Error::Shutdown) => Err(Exit::SchedulerShutdown), @@ -193,18 +199,18 @@ pub fn sched(matches: clap::ArgMatches<'static>) -> Result{ // We acquire the node let node = to_exit!(host.async_acquire().await, Exit::NodeAcquisition)?; - let mut store = store; + store.extend(node.context.envs.clone().into_iter()); debug!("Acquired node with context: \nCwd: {}\nEnvs:\n {}", node.context.cwd.0.to_str().unwrap(), format_env(&node.context.envs).replace("\n", "\n ") ); - Ok((arguments, node, store)) + Ok((arguments, node, store, id)) }; // We execute this future and breaks if an error was encountered - let (arguments, node, store) = match executor.run(arg_and_node_and_store_fut){ + let (arguments, node, store, id) = match executor.run(arg_and_node_and_store_fut){ Ok(a) => a, Err(e) => break e }; @@ -213,7 +219,7 @@ pub fn sched(matches: clap::ArgMatches<'static>) -> Result{ let perform_fut = async move { info!("Starting execution with arguments\"{}\"", arguments); // We perform the exec - let (local_fetch_archive, store, remote_fetch_hash, execution_code) = perform_on_node( + let (local_fetch_archive, store, remote_fetch_hash, output) = perform_on_node( store, node, &host, @@ -226,13 +232,21 @@ pub fn sched(matches: clap::ArgMatches<'static>) -> Result{ &fetch_include_globs, matches.is_present("on-local"), ).await?; - let ret = unpacks_fetch_post_proc(&matches, local_fetch_archive, store.clone(), remote_fetch_hash, execution_code); - if let Some(EnvironmentValue(features)) = store.get(&EnvironmentKey("RUNAWAY_FEATURES".into())) { - to_exit!(sched.async_record_output(arguments, features.to_string()).await, Exit::RecordFeatures)?; - } else { - error!("RUNAWAY_FEATURES was not set."); - return Err(Exit::FeaturesNotSet); - } + let ret = unpacks_fetch_post_proc(&matches, local_fetch_archive.clone(), store.clone(), remote_fetch_hash, output.ecode); + let features = match store.get(&EnvironmentKey("RUNAWAY_FEATURES".into())){ + Some(EnvironmentValue(features)) => features.to_string(), + None => "".to_owned() + }; + let path = local_fetch_archive + .parent() + .unwrap() + .canonicalize() + .unwrap() + .to_str() + .unwrap() + .to_owned(); + to_exit!(sched.async_record_output(id, arguments, output.stdout, output.stderr, output.ecode, features, path).await, + Exit::RecordFeatures)?; ret }; @@ -375,19 +389,13 @@ async fn perform_on_node(store: EnvironmentStore, fetch_ignore_globs: &Vec>, fetch_include_globs: &Vec>, on_local: bool - ) -> Result<(PathBuf, EnvironmentStore, Sha1Hash, i32), Exit>{ + ) -> Result<(PathBuf, EnvironmentStore, Sha1Hash, OutputBuf), Exit>{ let mut store = store; push_env(&mut store, "RUNAWAY_ARGUMENTS", arguments); - // We generate an uuid - let id = uuid::Uuid::new_v4().hyphenated().to_string(); - push_env(&mut store, "RUNAWAY_UUID", id.clone()); - debug!("Execution id set to {}", format!("{}", id)); - - // We generate the remote folder and unpack data into it let remote_folder; if on_local{ @@ -403,6 +411,8 @@ async fn perform_on_node(store: EnvironmentStore, &node ).await?; + // We retrieve the id + let EnvironmentValue(id) = store.get(&EnvironmentKey("RUNAWAY_UUID".into())).unwrap().to_owned(); // We perform the job debug!("Executing script"); @@ -514,7 +524,7 @@ async fn perform_on_node(store: EnvironmentStore, // We return needed informations - Ok((local_fetch_archive, execution_context.envs, remote_fetch_hash, out.ecode)) + Ok((local_fetch_archive, execution_context.envs, remote_fetch_hash, out)) }