Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
PERE Alexandre
orchestra
Commits
bb772754
Commit
bb772754
authored
Dec 17, 2019
by
PERE Alexandre
Browse files
Merge branch 'develop' into '33-parametric-profiles-2'
# Conflicts: # liborchestra/src/lib/scheduler.rs
parents
15e36e36
c78b6b2d
Changes
3
Hide whitespace changes
Inline
Side-by-side
liborchestra/src/lib/scheduler.rs
View file @
bb772754
...
...
@@ -36,6 +36,7 @@ use tracing_futures::Instrument;
use
std
::
os
::
unix
::
process
::
CommandExt
;
use
libc
::{
signal
,
SIGINT
,
SIG_IGN
};
use
std
::
process
::
Stdio
;
use
std
::
path
::
PathBuf
;
//----------------------------------------------------------------------------------------- MESSAGES
...
...
@@ -46,11 +47,12 @@ use std::process::Stdio;
/// This enumeration represents the different request messages that can be sent to the command. Those
/// requests will be serialized to the following jsons when sent to the command stdin:
pub
enum
RequestMessages
{
/// Example of json transcript: `{"GET_PARAMETERS_REQUEST": {}}`
GetParametersRequest
{},
/// Example of json transcript: `{"RECORD_OUTPUT_REQUEST": {"parametes": "some params",
/// Example of json transcript: `{"GET_PARAMETERS_REQUEST": {"uuid": "kkkagr23451"}}`
GetParametersRequest
{
uuid
:
String
},
/// Example of json transcript: `{"RECORD_OUTPUT_REQUEST": {"uuid": "kkkagr23451", "parametes":
/// "some params", "stdout": "some mess", "stderr": "some mess", "ecode": 0, "path": "/home",
/// "features": "[0.5, 0.5]"} }`
RecordOutputRequest
{
parameters
:
String
,
features
:
String
},
RecordOutputRequest
{
uuid
:
String
,
parameters
:
String
,
stdout
:
String
,
stderr
:
String
,
ecode
:
i32
,
features
:
String
,
path
:
String
},
// Example of json transcript: `{"SHUTDOWN_REQUEST": {}}`
ShutdownRequest
{},
}
...
...
@@ -76,7 +78,7 @@ pub enum ResponseMessages{
//------------------------------------------------------------------------------------------- MACROS
/// This macro allows to send a particular request to the scheduler, and retrieve the output
/// This macro allows to send a particular request to the scheduler, and retrieve the output
.
#[macro_export]
macro_rules!
query_command
{
(
$sched
:
expr
,
$req
:
expr
)
=>
{
...
...
@@ -220,7 +222,7 @@ impl Scheduler {
/// Inner future containing the logic to request parameters.
#[instrument(name=
"Scheduler::request_parameters"
,
skip(sched))]
async
fn
request_parameters
(
sched
:
Arc
<
Mutex
<
Scheduler
>>
)
->
Result
<
String
,
Error
>
{
async
fn
request_parameters
(
sched
:
Arc
<
Mutex
<
Scheduler
>>
,
uuid
:
String
)
->
Result
<
String
,
Error
>
{
trace!
(
"Requesting parameters"
);
loop
{
let
response
=
{
...
...
@@ -236,7 +238,7 @@ impl Scheduler {
}
// We query the command
let
request
=
RequestMessages
::
GetParametersRequest
{};
let
request
=
RequestMessages
::
GetParametersRequest
{
uuid
:
uuid
.clone
()
};
query_command!
(
sched
,
&
request
)
?
};
...
...
@@ -253,7 +255,9 @@ impl Scheduler {
/// Inner future containing the logic to record an output.
#[instrument(name=
"Scheduler::record_output"
,
skip(sched))]
async
fn
record_output
(
sched
:
Arc
<
Mutex
<
Scheduler
>>
,
parameters
:
String
,
features
:
String
)
->
Result
<
(),
Error
>
{
async
fn
record_output
(
sched
:
Arc
<
Mutex
<
Scheduler
>>
,
uuid
:
String
,
parameters
:
String
,
stdout
:
String
,
stderr
:
String
,
ecode
:
i32
,
features
:
String
,
path
:
String
)
->
Result
<
(),
Error
>
{
trace!
(
"Recording output"
);
{
// We bind the command to this scope. Such that if one of the io blocks, we are sure that
...
...
@@ -271,7 +275,7 @@ impl Scheduler {
}
// We query the command
let
request
=
RequestMessages
::
RecordOutputRequest
{
parameters
,
features
};
let
request
=
RequestMessages
::
RecordOutputRequest
{
uuid
,
parameters
,
stdout
,
stderr
,
ecode
,
features
,
path
};
match
query_command!
(
sched
,
&
request
)
?
{
ResponseMessages
::
RecordOutputResponse
{}
=>
Ok
(()),
m
=>
Err
(
Error
::
Message
(
format!
(
"Unexpected message received {:?}"
,
m
)))
...
...
@@ -323,8 +327,8 @@ impl Scheduler {
/// Messages sent by the outer future to the resource inner thread, so as to start an operation.
/// This contains the input of the operation if any.
enum
OperationInput
{
RequestParameters
,
RecordOutput
(
String
,
String
),
RequestParameters
(
String
)
,
RecordOutput
(
String
,
String
,
String
,
String
,
i32
,
String
,
String
),
Shutdown
,
}
...
...
@@ -396,9 +400,9 @@ impl SchedulerHandle {
let
_guard
=
span
.enter
();
trace!
(
?
operation
,
"Received operation"
);
match
operation
{
OperationInput
::
RequestParameters
=>
{
OperationInput
::
RequestParameters
(
uuid
)
=>
{
spawner
.spawn_local
(
Scheduler
::
request_parameters
(
res
.clone
())
Scheduler
::
request_parameters
(
res
.clone
()
,
uuid
)
.map
(|
a
|
{
sender
.send
(
OperationOutput
::
RequestParameters
(
a
))
.map_err
(|
e
|
error!
(
"Failed to
\\
...
...
@@ -408,9 +412,9 @@ impl SchedulerHandle {
.instrument
(
span
.clone
())
)
}
OperationInput
::
RecordOutput
(
parameters
,
features
)
=>
{
OperationInput
::
RecordOutput
(
uuid
,
parameters
,
stdout
,
stderr
,
ecode
,
features
,
path
)
=>
{
spawner
.spawn_local
(
Scheduler
::
record_output
(
res
.clone
(),
parameters
,
features
)
Scheduler
::
record_output
(
res
.clone
(),
uuid
,
parameters
,
stdout
,
stderr
,
ecode
,
features
,
path
)
.map
(|
a
|
{
sender
.send
(
OperationOutput
::
RecordOutput
(
a
))
.map_err
(|
e
|
error!
(
"Failed to
\\
...
...
@@ -473,12 +477,12 @@ impl SchedulerHandle {
/// Async method, which request a parameter string from the scheduler, and wait for it if the
/// scheduler is not yet ready.
pub
fn
async_request_parameters
(
&
self
)
->
impl
Future
<
Output
=
Result
<
String
,
Error
>>
{
pub
fn
async_request_parameters
(
&
self
,
uuid
:
String
)
->
impl
Future
<
Output
=
Result
<
String
,
Error
>>
{
let
mut
chan
=
self
._sender
.clone
();
async
move
{
let
(
sender
,
receiver
)
=
oneshot
::
channel
();
trace!
(
"Sending async request parameters input"
);
chan
.send
((
sender
,
OperationInput
::
RequestParameters
))
chan
.send
((
sender
,
OperationInput
::
RequestParameters
(
uuid
)
))
.await
.map_err
(|
e
|
Error
::
Channel
(
e
.to_string
()))
?
;
trace!
(
"Awaiting async request parameters output"
);
...
...
@@ -491,12 +495,14 @@ impl SchedulerHandle {
}
/// Async method, returning a future that ultimately resolves after the output was recorded.
pub
fn
async_record_output
(
&
self
,
parameters
:
String
,
output
:
String
)
->
impl
Future
<
Output
=
Result
<
(),
Error
>>
{
pub
fn
async_record_output
(
&
self
,
uuid
:
String
,
parameters
:
String
,
stdout
:
String
,
stderr
:
String
,
ecode
:
i32
,
features
:
String
,
path
:
String
)
->
impl
Future
<
Output
=
Result
<
(),
Error
>>
{
let
mut
chan
=
self
._sender
.clone
();
async
move
{
let
(
sender
,
receiver
)
=
oneshot
::
channel
();
trace!
(
"Sending async record output input"
);
chan
.send
((
sender
,
OperationInput
::
RecordOutput
(
parameters
,
output
)))
chan
.send
((
sender
,
OperationInput
::
RecordOutput
(
uuid
,
parameters
,
stdout
,
stderr
,
ecode
,
features
,
path
)))
.await
.map_err
(|
e
|
Error
::
Channel
(
e
.to_string
()))
?
;
trace!
(
"Awaiting async record output output"
);
...
...
@@ -611,7 +617,11 @@ if __name__ == \"__main__\":
elif
\"
RECORD_OUTPUT_REQUEST
\"
in inpt.keys():
sys.stderr.write(f
\"
Python received RECORD_OUTPUT_REQUEST {inpt}
\\
n
\"
)
if inpt[
\"
RECORD_OUTPUT_REQUEST
\"
][
\"
parameters
\"
] !=
\"
params_from_rust
\"
: raise Exception()
if inpt[
\"
RECORD_OUTPUT_REQUEST
\"
][
\"
stdout
\"
] !=
\"
stdout
\"
: raise Exception()
if inpt[
\"
RECORD_OUTPUT_REQUEST
\"
][
\"
stderr
\"
] !=
\"
stderr
\"
: raise Exception()
if inpt[
\"
RECORD_OUTPUT_REQUEST
\"
][
\"
ecode
\"
] != 0: raise Exception()
if inpt[
\"
RECORD_OUTPUT_REQUEST
\"
][
\"
features
\"
] != '1.5': raise Exception()
if inpt[
\"
RECORD_OUTPUT_REQUEST
\"
][
\"
path
\"
] !=
\"
.
\"
: raise Exception()
sys.stdout.write(json.dumps({
\"
RECORD_OUTPUT_RESPONSE
\"
: {}}))
sys.stdout.write(
\"\\
n
\"
)
elif
\"
SHUTDOWN_REQUEST
\"
in inpt.keys():
...
...
@@ -643,6 +653,10 @@ if __name__ == \"__main__\":
elif
\"
RECORD_OUTPUT_REQUEST
\"
in inpt.keys():
sys.stderr.write(
\"
Python received RECORD_OUTPUT_REQUEST
\\
n
\"
)
if inpt[
\"
RECORD_OUTPUT_REQUEST
\"
][
\"
parameters
\"
] !=
\"
params_from_rust
\"
: raise Exception()
if inpt[
\"
RECORD_OUTPUT_REQUEST
\"
][
\"
stdout
\"
] !=
\"
stdout
\"
: raise Exception()
if inpt[
\"
RECORD_OUTPUT_REQUEST
\"
][
\"
stderr
\"
] !=
\"
stderr
\"
: raise Exception()
if inpt[
\"
RECORD_OUTPUT_REQUEST
\"
][
\"
ecode
\"
] != 0: raise Exception()
if inpt[
\"
RECORD_OUTPUT_REQUEST
\"
][
\"
path
\"
] !=
\"
.
\"
: raise Exception()
if inpt[
\"
RECORD_OUTPUT_REQUEST
\"
][
\"
features
\"
] != '1.5': raise Exception()
print(json.dumps({
\"
ERROR_RESPONSE
\"
: {
\"
message
\"
:
\"
error_from_python
\"
}}))
elif
\"
SHUTDOWN_REQUEST
\"
in inpt.keys():
...
...
@@ -673,10 +687,10 @@ if __name__ == \"__main__\":
command
.stderr
(
std
::
process
::
Stdio
::
inherit
());
let
scheduler
=
SchedulerHandle
::
spawn
(
command
,
"scheduler.py"
.into
())
.unwrap
();
let
parameters
=
block_on
(
scheduler
.async_request_parameters
())
.unwrap
();
let
parameters
=
block_on
(
scheduler
.async_request_parameters
(
"hhh"
.into
()
))
.unwrap
();
assert_eq!
(
parameters
,
format!
(
"params_from_python"
));
block_on
(
scheduler
.async_record_output
(
"params_from_rust"
.into
(),
"
1.5
"
.into
()))
.unwrap
();
block_on
(
scheduler
.async_record_output
(
"hhh"
.into
(),
"params_from_rust"
.into
(),
"
stdout"
.into
(),
"stderr"
.into
(),
0
,
"1.5"
.into
(),
".
"
.into
()))
.unwrap
();
drop
(
scheduler
);
...
...
@@ -688,9 +702,9 @@ if __name__ == \"__main__\":
command
.stderr
(
std
::
process
::
Stdio
::
inherit
());
let
scheduler
=
SchedulerHandle
::
spawn
(
command
,
"scheduler.py"
.into
())
.unwrap
();
block_on
(
scheduler
.async_request_parameters
())
.unwrap_err
();
block_on
(
scheduler
.async_request_parameters
(
"hhh"
.into
()
))
.unwrap_err
();
block_on
(
scheduler
.async_record_output
(
"params_from_rust"
.into
(),
"
1.5
"
.into
()))
.unwrap_err
();
block_on
(
scheduler
.async_record_output
(
"hhh"
.into
(),
"params_from_rust"
.into
(),
"
stdout"
.into
(),
"stderr"
.into
(),
0
,
"1.5"
.into
(),
".
"
.into
()))
.unwrap_err
();
}
...
...
runaway-cli/src/main.rs
View file @
bb772754
...
...
@@ -239,7 +239,8 @@ fn main(){
.help
(
"File name of the script to be executed"
)
.required
(
true
))
.arg
(
clap
::
Arg
::
with_name
(
"SCHEDULER"
)
.help
(
"Search command to use to schedule experiment parameters."
))
.help
(
"Search command to use to schedule experiment parameters."
)
.required
(
true
))
.arg
(
clap
::
Arg
::
with_name
(
"verbose"
)
.short
(
"V"
)
.long
(
"verbose"
)
...
...
runaway-cli/src/subcommands/sched.rs
View file @
bb772754
...
...
@@ -180,10 +180,16 @@ pub fn sched(matches: clap::ArgMatches<'static>) -> Result<Exit, Exit>{
// Again, we make local copies.
let
sched
=
sched
.clone
();
let
host
=
host
.clone
();
let
mut
store
=
store
;
// We generate an uuid
let
id
=
uuid
::
Uuid
::
new_v4
()
.hyphenated
()
.to_string
();
push_env
(
&
mut
store
,
"RUNAWAY_UUID"
,
id
.clone
());
debug!
(
"Execution id set to {}"
,
format!
(
"{}"
,
id
));
// We get the arguments
info!
(
"Querying the scheduler"
);
let
arguments
:
String
=
match
sched
.async_request_parameters
()
.await
{
let
arguments
:
String
=
match
sched
.async_request_parameters
(
id
.clone
()
)
.await
{
Ok
(
arg
)
=>
Ok
(
arg
),
Err
(
liborchestra
::
scheduler
::
Error
::
Crashed
)
=>
Err
(
Exit
::
SchedulerCrashed
),
Err
(
liborchestra
::
scheduler
::
Error
::
Shutdown
)
=>
Err
(
Exit
::
SchedulerShutdown
),
...
...
@@ -193,18 +199,18 @@ pub fn sched(matches: clap::ArgMatches<'static>) -> Result<Exit, Exit>{
// We acquire the node
let
node
=
to_exit!
(
host
.async_acquire
()
.await
,
Exit
::
NodeAcquisition
)
?
;
let
mut
store
=
store
;
store
.extend
(
node
.context.envs
.clone
()
.into_iter
());
debug!
(
"Acquired node with context:
\n
Cwd: {}
\n
Envs:
\n
{}"
,
node
.context.cwd
.0
.to_str
()
.unwrap
(),
format_env
(
&
node
.context.envs
)
.replace
(
"
\n
"
,
"
\n
"
)
);
Ok
((
arguments
,
node
,
store
))
Ok
((
arguments
,
node
,
store
,
id
))
};
// We execute this future and breaks if an error was encountered
let
(
arguments
,
node
,
store
)
=
match
executor
.run
(
arg_and_node_and_store_fut
){
let
(
arguments
,
node
,
store
,
id
)
=
match
executor
.run
(
arg_and_node_and_store_fut
){
Ok
(
a
)
=>
a
,
Err
(
e
)
=>
break
e
};
...
...
@@ -213,7 +219,7 @@ pub fn sched(matches: clap::ArgMatches<'static>) -> Result<Exit, Exit>{
let
perform_fut
=
async
move
{
info!
(
"Starting execution with arguments
\"
{}
\"
"
,
arguments
);
// We perform the exec
let
(
local_fetch_archive
,
store
,
remote_fetch_hash
,
execution_code
)
=
perform_on_node
(
let
(
local_fetch_archive
,
store
,
remote_fetch_hash
,
output
)
=
perform_on_node
(
store
,
node
,
&
host
,
...
...
@@ -226,13 +232,21 @@ pub fn sched(matches: clap::ArgMatches<'static>) -> Result<Exit, Exit>{
&
fetch_include_globs
,
matches
.is_present
(
"on-local"
),
)
.await
?
;
let
ret
=
unpacks_fetch_post_proc
(
&
matches
,
local_fetch_archive
,
store
.clone
(),
remote_fetch_hash
,
execution_code
);
if
let
Some
(
EnvironmentValue
(
features
))
=
store
.get
(
&
EnvironmentKey
(
"RUNAWAY_FEATURES"
.into
()))
{
to_exit!
(
sched
.async_record_output
(
arguments
,
features
.to_string
())
.await
,
Exit
::
RecordFeatures
)
?
;
}
else
{
error!
(
"RUNAWAY_FEATURES was not set."
);
return
Err
(
Exit
::
FeaturesNotSet
);
}
let
ret
=
unpacks_fetch_post_proc
(
&
matches
,
local_fetch_archive
.clone
(),
store
.clone
(),
remote_fetch_hash
,
output
.ecode
);
let
features
=
match
store
.get
(
&
EnvironmentKey
(
"RUNAWAY_FEATURES"
.into
())){
Some
(
EnvironmentValue
(
features
))
=>
features
.to_string
(),
None
=>
""
.to_owned
()
};
let
path
=
local_fetch_archive
.parent
()
.unwrap
()
.canonicalize
()
.unwrap
()
.to_str
()
.unwrap
()
.to_owned
();
to_exit!
(
sched
.async_record_output
(
id
,
arguments
,
output
.stdout
,
output
.stderr
,
output
.ecode
,
features
,
path
)
.await
,
Exit
::
RecordFeatures
)
?
;
ret
};
...
...
@@ -375,19 +389,13 @@ async fn perform_on_node(store: EnvironmentStore,
fetch_ignore_globs
:
&
Vec
<
Glob
<
String
>>
,
fetch_include_globs
:
&
Vec
<
Glob
<
String
>>
,
on_local
:
bool
)
->
Result
<
(
PathBuf
,
EnvironmentStore
,
Sha1Hash
,
i32
),
Exit
>
{
)
->
Result
<
(
PathBuf
,
EnvironmentStore
,
Sha1Hash
,
OutputBuf
),
Exit
>
{
let
mut
store
=
store
;
push_env
(
&
mut
store
,
"RUNAWAY_ARGUMENTS"
,
arguments
);
// We generate an uuid
let
id
=
uuid
::
Uuid
::
new_v4
()
.hyphenated
()
.to_string
();
push_env
(
&
mut
store
,
"RUNAWAY_UUID"
,
id
.clone
());
debug!
(
"Execution id set to {}"
,
format!
(
"{}"
,
id
));
// We generate the remote folder and unpack data into it
let
remote_folder
;
if
on_local
{
...
...
@@ -403,6 +411,8 @@ async fn perform_on_node(store: EnvironmentStore,
&
node
)
.await
?
;
// We retrieve the id
let
EnvironmentValue
(
id
)
=
store
.get
(
&
EnvironmentKey
(
"RUNAWAY_UUID"
.into
()))
.unwrap
()
.to_owned
();
// We perform the job
debug!
(
"Executing script"
);
...
...
@@ -514,7 +524,7 @@ async fn perform_on_node(store: EnvironmentStore,
// We return needed informations
Ok
((
local_fetch_archive
,
execution_context
.envs
,
remote_fetch_hash
,
out
.ecode
))
Ok
((
local_fetch_archive
,
execution_context
.envs
,
remote_fetch_hash
,
out
))
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment