Mentions légales du service

Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • master
  • processoutputhandler
  • 2.3
  • 2.4
  • 2.4.1
  • 2.4.2
  • 2.4.3
  • 2.5
  • 2.5.1
  • 2.5.2
  • 2.5.3
  • 2.5.4
  • 2.6
  • 2.6.1
  • 2.6.2
  • 2.6.3
  • 2.6.4
  • 2.6.5
  • v1.0
  • v1.1
  • v1.2
  • v1.3
  • v2.0
  • v2.1
  • v2.2
  • v2.2-dev-before-merge-nextgen
  • v2.6.6
  • v2.6.7
  • v2.6.8
  • v2.7
  • v2.8
  • v2.8.1
  • v2.8.2
33 results

Target

Select target project
  • mimbert/execo
  • bjonglez/execo
2 results
Select Git revision
  • fix_check_deployed_command
  • master
  • 2.3
  • 2.4
  • 2.4.1
  • 2.4.2
  • 2.4.3
  • 2.5
  • 2.5.1
  • 2.5.2
  • 2.5.3
  • 2.5.4
  • 2.6
  • 2.6.1
  • 2.6.2
  • 2.6.3
  • 2.6.4
  • 2.6.5
  • v1.0
  • v1.1
  • v1.2
  • v1.3
  • v2.0
  • v2.1
  • v2.2
  • v2.2-dev-before-merge-nextgen
  • v2.6.6
  • v2.6.7
  • v2.6.8
29 results
Show changes
Commits on Source (60)
Showing
with 225 additions and 195 deletions
......@@ -35,7 +35,7 @@ along with Execo. If not, see <http://www.gnu.org/licenses/>
Versions
========
latest stable version: v2.6.8 (2021-11-17)
latest stable version: v2.8.2 (2025-03-18)
Installation instructions
=========================
......@@ -134,8 +134,8 @@ Bugs
Execo is regularly used to perform advanced experiments and
administration / monitoring tasks, in and outside Grid5000. We
actively fix bugs. Bugs should be reported to http://bugzilla.inria.fr
(product: execo)
actively fix bugs. Bugs should be reported to
https://gitlab.inria.fr/mimbert/execo/-/issues
Publications
============
......
#!/usr/bin/env python
#!/usr/bin/env python3
import optparse
from execo import sleep
......@@ -9,9 +9,9 @@ def get_all_jobs():
def show_jobs(jobs):
for site in get_g5k_sites():
print "site: %s" % site
print("site: %s" % site)
for job_id in [ job_id for (job_id, job_site) in jobs if job_site == site ]:
print " %s" % job_id
print(" %s" % job_id)
if __name__ == "__main__":
......@@ -27,11 +27,11 @@ if __name__ == "__main__":
retry = True
if len(jobs) > 0:
while retry and len(jobs) > 0:
print "deleting..."
print("deleting...")
oardel(jobs)
if options.force:
sleep(5)
jobs = get_all_jobs()
else:
retry = False
print "...done"
print("...done")
#!/usr/bin/env python
#!/usr/bin/env python3
from execo import Remote
from execo_g5k import get_g5k_sites
......
#!/usr/bin/env python
#!/usr/bin/env python3
import execo, execo_g5k, execo_g5k.api_utils
......@@ -6,6 +6,6 @@ sites = execo_g5k.api_utils.get_g5k_sites()
all_jobs = execo_g5k.get_current_oar_jobs(sites, abort_on_error = False)
for site in execo_g5k.api_utils.get_g5k_sites():
print "site: %s" % site
print("site: %s" % site)
for job_id in [ job_id for (job_id, job_site) in all_jobs if job_site == site ]:
print " %s" % job_id
print(" %s" % job_id)
#!/usr/bin/env python
#!/usr/bin/env python3
from execo_g5k.api_utils import *
for site in get_g5k_sites():
print "site: %s" % site
print("site: %s" % site)
for cluster in get_site_clusters(site):
print " %s" % cluster
print(" %s" % cluster)
# for host in get_cluster_hosts(cluster):
# print " %s" % host
python-execo (2.8.2) unstable; urgency=medium
* [execo_g5k] planning: fix exclusion of hosts/cluster additional constraint
* [execo_g5k] api_utils: get_cluster_site() returns None for an unknown cluster instead of raising an exception (to be consistent with the rest of the api eg. get_host_cluster()
* [execo, execo_g5k] fix escape sequences (in regexes or in strings)
* [execo, execo_g5k] use pipes.quote or shlex.quote depending on py version
* [doc] fix bug reporting url
* [execo] utils: singleton_to_collection better None handling -> empty list
* [execo_g5k] oar: add -t origin=execo to all jobs
* [execo_g5k] remove oargridsub (but the examples in the user guide still need to be updated)
-- Matthieu Imbert <matthieu.imbert@inria.fr> Tue, 18 Mar 2025 13:09:22 +0100
python-execo (2.8.1) unstable; urgency=medium
* [execo, execo_g5k, execi_engine] fix regexes thanks to python 3.12 improved handling of invalid escape sequences (https://docs.python.org/3/whatsnew/3.12.html#other-language-changes)
* [execo_engine] add option for using pty to copy_outputs() useful for having working cmdline completion in ipdb when ipdb is spawned from a an execo script
* [execo] Remote: fix corner case in process args handling fix passing None to a Remote's Process args stdout/err handler caused exception due to not handling special case in substitutions
* [execo_g5k] Kaconsole: fix default prompt regex
* [execo_g5k] Add Kaconsole to documentation
-- Matthieu Imbert <matthieu.imbert@inria.fr> Mon, 21 Oct 2024 09:22:37 +0200
python-execo (2.8) unstable; urgency=medium
* |packaging] git-based version numbers compliant with recent distutils
* [execo_g5k] api utils: fix APIConnection handling of http return codes
* [execo_g5k] oar: adapt to changes in oarstat output format
* fix again version naming for compliance with pep 440
* [execo_g5k] g5k api cache: stored as json instead of pickle
* [execo_g5k] compatibility with old and new oarstat output formats
* [execo_g5k] support clusters names ending with numbers
* [execo_g5k] canonical_host_name: handle interface, kavlan, ipv6 + support clusters names ending with numbers
* [execo_g5k] get_host_longname: correct behaviour when site unknown
* [execo_g5k] planning: add include_hosts flag to compute_slots()
* [execo_g5k] planning: rewrite of get_jobs_specs to handle individual hosts
* [packaging] remove v prefix from version for package naming
* [execo] Process: lifecycle handlers in separate threads to avoid blocking + refactoring
* [execo] Process: handle encoding (py3+) when writing to Process
* [execo] Process: expect improvements - expect handler constructor takes process to be able to scan at init - scanning code factorized between stdin / stderr events and constructor - debug log - allow passing an explicit expect_output_handler (instead of the default automatic thread local one) to the expect method for more control in specific situations - protect from uninitialized thread local expect output handler
* [execo_g5k] api_utils - canonical_host_name: fix for ifname != ethX (eg. fpgaX) - add get_host_interface
* [execo_g5k] add get_cluster_queues, get_cluster_jobtypes
* [execo_g5k] kadeploy: add KaconsoleProcess
* [execo] Process: ExpectOutputHandler debug log
* [remote] add substitutions to filenames in stdout/stderr handlers
* [execo_g5k] KaConsoleProcess: add close()
* [execo_g5k] planning: log level warn instead of detail for errors while getting data from oar db with sql
* [execo_engine] add an optional separator to slugify
* [execo] ExpectOutputHandler: improve debug logging
* [execo] ProcessOuputHandler: rename member var
* [execo] Get, Put: scp command as list and shell=False to (securely) handle spaces in path
* [execo] full redesign of the expect implementation
* [execo_g5k] fix KaConsoleProcess
* [execo] conductor: fix eating 100% of one core iterating through high number of fd to close them
-- Matthieu Imbert <matthieu.imbert@inria.fr> Tue, 11 Jun 2024 16:00:16 +0200
python-execo (2.7) unstable; urgency=low
* new upstream release:
- [execo] bugfixes of ProcessOutputHandlers, with a slight API change, hence the switch to 2.7 serie
- [documentation] update doc and examples to python3
- [execo_g5k] no more allow_classic_ssh in examples (grid5000 doesn't have this job type anymore)
- [documentation] various minor improvements and updates to examples
- [execo_g5k] fix g5k API cache validity check
-- Matthieu Imbert <matthieu.imbert@inria.fr> Wed, 25 Jan 2023 10:58:00 +0100
python-execo (2.6.8) unstable; urgency=low
* new upstream release:
......
......@@ -2,11 +2,11 @@ Format: http://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Upstream-Name: execo
Upstream-Contact: <matthieu.imbert@inria.fr>
Source: https://gitlab.inria.fr/mimbert/execo
Copyright: 2009-2021 INRIA Rhone-Alpes, Service Experimentation et Developpement
Copyright: 2009-2025 INRIA Rhone-Alpes, Service Experimentation et Developpement
License: GPL-3+
Files: *
Copyright: 2009-2021 INRIA Rhone-Alpes, Service Experimentation et Developpement
Copyright: 2009-2025 INRIA Rhone-Alpes, Service Experimentation et Developpement
License: GPL-3+
License: GPL-3+
......
......@@ -4,7 +4,9 @@ from execo_g5k import *
logger.info("compute resources to reserve")
blacklisted = [ "graphite", "reims", "helios-6.sophia.grid5000.fr",
"helios-42.sophia.grid5000.fr", "helios-44.sophia.grid5000.fr",
"sol-21.sophia.grid5000.fr", "suno-3.sophia.grid5000.fr" ]
"sol-21.sophia.grid5000.fr", "suno-3.sophia.grid5000.fr",
"grouille", "neowise", "pyxis", "drac", "servan", "troll", "yeti",
"sirius" ]
planning = get_planning()
slots = compute_slots(planning, 60*10, excluded_elements = blacklisted)
wanted = {'grid5000': 0}
......
......@@ -11,11 +11,8 @@ if jobid:
nodes = get_oar_job_nodes(jobid, site)
# group nodes by cluster
sources, targets = [ list(n) for c, n in itertools.groupby(
sorted(nodes,
lambda n1, n2: cmp(
get_host_cluster(n1),
get_host_cluster(n2))),
get_host_cluster) ]
sorted(nodes, key=get_host_cluster),
get_host_cluster) ]
servers = Remote("iperf -s",
targets,
connection_params = default_oarsh_oarcp_params)
......
......@@ -11,8 +11,7 @@ actual_resources = distribute_hosts(resources, wanted, ratio = 0.8)
job_specs = get_jobs_specs(actual_resources)
logger.info("try to reserve " + str(actual_resources))
jobs = oarsubgrid(job_specs, start_date,
walltime = end_date - start_date,
job_type = "allow_classic_ssh")
walltime = end_date - start_date)
if len(jobs) > 0:
try:
logger.info("wait jobs start")
......@@ -24,7 +23,7 @@ if len(jobs) > 0:
cores = []
nodes = sorted(nodes, key=get_host_cluster)
for cluster, cluster_nodes in itertools.groupby(nodes, key=get_host_cluster):
num_cores = get_host_attributes(cluster + "-1")["architecture"]["nb_cores"]
num_cores = get_host_attributes(get_cluster_hosts(cluster)[0])["architecture"]["nb_cores"]
for node in cluster_nodes:
cores += [ node ] * num_cores
logger.info("for a total of %i cores" % (len(cores),))
......
......@@ -27,9 +27,9 @@ class g5k_tcp_congestion(Engine):
if cluster in get_g5k_clusters() and n_nodes > 0
and 1e9 ==
[adapter
for adapter in get_host_attributes(cluster + "-1")["network_adapters"]
for adapter in get_host_attributes(get_cluster_hosts(cluster)[0])["network_adapters"]
if adapter.get("network_address") ==
cluster + "-1." + get_cluster_site(cluster) + ".grid5000.fr"][0]["rate"]],
get_cluster_hosts(cluster)[0] + "." + get_cluster_site(cluster) + ".grid5000.fr"][0]["rate"]],
key = get_cluster_site),
get_cluster_site) }.items())[0:2])
if len(actual_resources) >= 2:
......@@ -47,7 +47,7 @@ class g5k_tcp_congestion(Engine):
nodes = get_oargrid_job_nodes(jobid)
if len(nodes) != 2: raise Exception("not enough nodes")
logger.info("deploy %i nodes" % (len(nodes),))
deployed, undeployed = deploy(Deployment(nodes, env_name = "jessie-x64-min"))
deployed, undeployed = deploy(Deployment(nodes, env_name = "debian11-min"))
logger.info("%i deployed, %i undeployed" % (len(deployed), len(undeployed)))
if len(deployed) != 2: raise Exception("not enough deployed nodes")
logger.info("prepare nodes")
......@@ -65,9 +65,9 @@ class g5k_tcp_congestion(Engine):
sleep(2)
sources.run()
if comb["num_flows"] > 1:
pattern = "^\[SUM\].*\s(\d+(\.\d+)?) (\w?)bits/sec"
pattern = r'^\[SUM\].*\s(\d+(\.\d+)?) (\w?)bits/sec'
else:
pattern = "^\[\s*\d+\].*\s(\d+(\.\d+)?) (\w?)bits/sec"
pattern = r'^\[\s*\d+\].*\s(\d+(\.\d+)?) (\w?)bits/sec'
bw_mo = re.search(pattern, sources.stdout, re.MULTILINE)
if bw_mo:
bw = float(bw_mo.group(1)) * {"": 1, "K": 1e3, "M": 1e6, "G": 1e9}[bw_mo.group(3)]
......
......@@ -7,7 +7,7 @@ from scipy import stats
if __name__ == "__main__":
fname = sys.argv[1]
with open(fname, "r") as f:
results = yaml.load(f)
results = yaml.load(f, Loader=yaml.UnsafeLoader)
arranged_results = {
tcp_congestion_control:
{ num_flows:
......@@ -30,7 +30,7 @@ if __name__ == "__main__":
label = tcp_congestion_control, linewidth=0.5)
offset += .36
plt.xlim(0, max(arranged_results[tcp_congestion_control]) + 1)
plt.xticks(arranged_results[tcp_congestion_control])
#plt.xticks(arranged_results[tcp_congestion_control])
plt.legend(loc = 'lower right')
plt.xlabel('num flows')
plt.ylabel('bandwith bits/s')
......
......@@ -8,7 +8,7 @@ logger.info("currently running oar jobs " + str(running_jobs))
logger.info("get job nodes")
nodes = [ job_nodes for job in running_jobs for job_nodes in get_oar_job_nodes(*job) ]
logger.info("deploying %i nodes" % (len(nodes),))
deployed, undeployed = deploy(Deployment(nodes, env_name = "wheezy-x64-min"))
deployed, undeployed = deploy(Deployment(nodes, env_name = "debian11-min"))
logger.info("%i deployed, %i undeployed" % (len(deployed), len(undeployed)))
if len(deployed) >= 2:
sources = list(deployed)[0:1]
......
......@@ -40,7 +40,7 @@ master_doc = 'index'
# General information about the project.
project = 'execo'
copyright = '2009-2021, INRIA Rhone-Alpes, Service Experimentation et Developpement'
copyright = '2009-2025, INRIA Rhone-Alpes, Service Experimentation et Developpement'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
......
......@@ -60,37 +60,6 @@ oarsubgrid
----------
.. autofunction:: execo_g5k.oar.oarsubgrid
OARGRID functions
=================
oargridsub
----------
.. autofunction:: execo_g5k.oargrid.oargridsub
oargriddel
----------
.. autofunction:: execo_g5k.oargrid.oargriddel
get_current_oargrid_jobs
------------------------
.. autofunction:: execo_g5k.oargrid.get_current_oargrid_jobs
get_oargrid_job_info
--------------------
.. autofunction:: execo_g5k.oargrid.get_oargrid_job_info
get_oargrid_job_oar_jobs
------------------------
.. autofunction:: execo_g5k.oargrid.get_oargrid_job_oar_jobs
wait_oargrid_job_start
----------------------
.. autofunction:: execo_g5k.oargrid.wait_oargrid_job_start
get_oargrid_job_nodes
---------------------
.. autofunction:: execo_g5k.oargrid.get_oargrid_job_nodes
kadeploy3
=========
......@@ -175,6 +144,16 @@ G5kAutoPortForwarder
.. autoclass:: execo_g5k.utils.G5kAutoPortForwarder
:members:
Kaconsole
=========
Kaconsole
---------
.. inheritance-diagram:: execo_g5k.kadeploy.KaconsoleProcess
.. autoclass:: execo_g5k.kadeploy.KaconsoleProcess
:members:
:show-inheritance:
Grid5000 API utilities
======================
......@@ -331,17 +310,15 @@ Its default values are:
OAR keys
========
Oar/oargrid by default generate job specific ssh keys. So by default,
one has to retrieve these keys and explicitely use them for connecting
to the jobs, which is painfull. Another possibility is to tell
oar/oargrid to use specific keys. Oar can automatically use the key
pointed to by the environement variable ``OAR_JOB_KEY_FILE`` if it is
defined. Oargrid does not automatically use this key, but execo also
takes care of explicitely telling oargrid to use it if it is
defined. So the most convenient way to use execo/oar/oargrid, is to
set ``OAR_JOB_KEY_FILE`` in your ``~/.profile`` to point to your
internal Grid5000 ssh key and export this environment variable, or use
the ``oar_job_key_file`` in `execo_g5k.config.g5k_configuration`.
Oar by default generate job specific ssh keys. So by default, one has
to retrieve these keys and explicitely use them for connecting to the
jobs, which is painfull. Another possibility is to tell oar to use
specific keys. Oar can automatically use the key pointed to by the
environement variable ``OAR_JOB_KEY_FILE`` if it is defined. So the
most convenient way to use execo/oar, is to set ``OAR_JOB_KEY_FILE``
in your ``~/.profile`` to point to your internal Grid5000 ssh key and
export this environment variable, or use the ``oar_job_key_file`` in
`execo_g5k.config.g5k_configuration`.
Running from another host than a frontend
=========================================
......@@ -373,7 +350,7 @@ Then in ``~/.execo.conf.py`` put this code::
import re
default_connection_params = {
'host_rewrite_func': lambda host: re.sub("\.grid5000\.fr$", ".g5k", host),
'host_rewrite_func': lambda host: re.sub(r'\.grid5000\.fr$', '.g5k', host),
'taktuk_gateway': 'g5k'
}
......
......@@ -18,9 +18,9 @@ with packages managed by your distribution package manager.
- Install from a release tar.gz package::
$ wget https://gitlab.inria.fr/mimbert/execo/-/package_files/[...]/download -O execo-2.6.8.tar.gz
$ tar xzf execo-2.6.8.tar.gz
$ cd execo-2.6.8/
$ wget https://gitlab.inria.fr/mimbert/execo/-/package_files/[...]/download -O execo-2.8.2.tar.gz
$ tar xzf execo-2.8.2.tar.gz
$ cd execo-2.8.2/
$ python setup.py install --user
- Or install from source repository if you want the very latest
......@@ -40,7 +40,7 @@ with packages managed by your distribution package manager.
- Or install from debian package::
$ dpkg -i python-execo_2.6.8_all.deb
$ dpkg -i python-execo_2.8.2_all.deb
Configuration
=============
......@@ -144,9 +144,9 @@ List all files in the root directory::
from execo import *
process = Process("ls /")
process.run()
print "process:\n%s" + str(process)
print "process stdout:\n" + process.stdout
print "process stderr:\n" + process.stderr
print("process:\n%s" + str(process))
print("process stdout:\n" + process.stdout)
print("process stderr:\n" + process.stderr)
The ``ls`` process was directly spawned, not using a subshell. Set
process property ``shell`` to True if a full shell environment
......@@ -181,7 +181,7 @@ sender, then wait for *process_B* termination, then kill
sleep(1)
sender = SshProcess("echo 'hi there!' | nc -q 0 <host1> 6543", "<host2>").run()
receiver.wait()
print receiver.stdout
print(receiver.stdout)
This example shows the asynchronous control of processes: while a
process is running (the netcat receiver), the code can do something
......@@ -220,7 +220,7 @@ receiver)::
receiver.expect("^[Ll]istening on")
sender = SshProcess("echo 'hi there!' | nc -q 0 <host1> 6543", "<host2>").run()
receiver.wait()
print receiver.stdout
print(receiver.stdout)
Of course, this kind of code only works if you are sure that the
version of netcat which is installed on ``<host1>`` is the one you
......@@ -240,13 +240,13 @@ waits process output for regular expressions::
from execo import *
prompt = '^~ #\s*$'
with SerialSsh("<host>", "/dev/ttyUSB1", 115200).start() as serial:
print >> serial
print(file=serial)
serial.expect('^\w login:\s*$')
print >> serial, "<login>"
print("<login>", file=serial)
serial.expect('^Password:\s*$')
print >> serial, "<password>"
print("<password>", file=serial)
serial.expect(prompt)
print >> serial, "<command>"
print("<command>", file=serial)
Actions
-------
......@@ -301,9 +301,9 @@ generate traffic in both directions::
sleep(1)
clients.run()
servers.wait()
print Report([ servers, clients ]).to_string()
print(Report([ servers, clients ]).to_string())
for s in servers.processes + clients.processes:
print "%s\nstdout:\n%s\nstderr:\n%s" % (s, s.stdout, s.stderr)
print("%s\nstdout:\n%s\nstderr:\n%s" % (s, s.stdout, s.stderr))
The netcat command line on clients shows the usage of *substitutions*:
In the command line given for Remote and in pathes given to Get, Put,
......@@ -334,13 +334,13 @@ expressions::
from execo import *
prompt = '^~ #\s*$'
with RemoteSerial(<list_of_hosts>, "/dev/ttyUSB1", 115200).start() as serial_ports:
print >> serial_ports
print(file=serial_ports)
serial_ports.expect('^\w login:\s*$')
print >> serial_ports, "<login>"
print("<login>", file=serial_ports)
serial_ports.expect('^Password:\s*$')
print >> serial_ports, "<password>"
print("<password>", file=serial_ports)
serial_ports.expect(prompt)
print >> serial_ports, "<command>"
print("<command>", file=serial_ports)
It is almost identical to the remote serial process example, except
that it handles several remote serial ports in parallel.
......@@ -359,14 +359,6 @@ API to use Grid5000 services:
- wait oar job start, get oar job nodes
- oargrid
- oargridsub, oargriddel
- get current oargrid jobs
- wait oargrid job start, get oargrid job nodes
- kadeploy3
- deploy: clever kadeploy: automatically avoids to deploy already
......@@ -453,7 +445,7 @@ from outside grid5000.
Here is another example of a one-liner to list the measured flops of
each cluster::
for cluster in get_g5k_clusters(): print cluster, ; print get_host_attributes(Host(cluster + "-1")).get("performance")
for cluster in get_g5k_clusters(): print(f"{cluster} {get_host_attributes(Host(cluster + '-1')).get('performance')}")
Other usage examples
====================
......@@ -499,8 +491,7 @@ This code shows:
- how to append a `execo.process.Process` ``stdout_handler`` which
redirects output to a file.
- how to take care of releasing the oargridjob with a try/finally
block.
- how to take care of releasing the oar job with a try/finally block.
After running this code, you get in the current directory on localhost
a file for each remote hosts containing the scaling governors,
......@@ -508,10 +499,9 @@ hyperthreading state, c-states state, turboboost state (easy to check
if they are all the same with ``cat * | sort -u``)
Note that with this kind of code, there is still the possibility that
the oar or oargrid reservation fails, since oar is not transactional,
and someone can still reserve some resources between the moment we
inquire the available resources and the moment we perform the
reservation.
the oar reservations fails, since oar is not transactional, and
someone can still reserve some resources between the moment we inquire
the available resources and the moment we perform the reservation.
The planning module has several possibilities and modes, see its
documentation for further reference.
......@@ -545,8 +535,9 @@ could probably be increased on a node where you have root
permissions).
This example also show using `execo_g5k.oar.oarsubgrid` instead of
`execo_g5k.oargrid.oargridsub`. They are similar but oarsubgrid
bypasses oargrid and directly performs parallel oar submissions.
oargridsub (a tool that existed in g5k a long time ago). They are
similar but oarsubgrid bypasses oargrid and directly performs parallel
oar submissions.
Compare ChainPut and parallel scp performances on many hosts on Grid5000
......@@ -567,9 +558,9 @@ hosts, you should run this code from a compute node, not the frontend
In this example, we use ``oarsh``. One of the constraints imposed by
``Taktuk`` is that any node of the connection tree must be able to
connect to any other. As the oargrid job key is only available on the
frontend on which the oargrid submission was done, we must propagate
this key to all nodes. This can be done with ``Taktuk`` option
connect to any other. As the oar job key is only available on the
frontend on which the oar submission was done, we must propagate this
key to all nodes. This can be done with ``Taktuk`` option
``-S``. Alternatively, this is not needed if setting
``$OAR_JOB_KEY_FILE`` in your environnement, or setting
``g5k_configuration['oar_job_key_file']``, as described in
......@@ -728,11 +719,11 @@ and debugger.
Using yaml for storing the results allows incrementally appending to
the file, and has the benefit of a human readable file. If there is an
error during the experiment (such as the end of the oargrid
reservation), the experiment can later be restarted in the same result
directory with option ``-C``, continuing from where it stopped. It is
even possible to change the parameter combinations, only the yet
undone combinations will be done.
error during the experiment (such as the end of the oar reservation),
the experiment can later be restarted in the same result directory
with option ``-C``, continuing from where it stopped. It is even
possible to change the parameter combinations, only the yet undone
combinations will be done.
Below is an example showing how to load the results and draw a graph:
......@@ -776,7 +767,7 @@ connect to the nodes from outside::
import re
def host_rewrite_func(host):
return re.sub("\.grid5000\.fr$", ".g5k", host)
return re.sub(r'\.grid5000\.fr$', '.g5k', host)
def frontend_rewrite_func(host):
return host + ".g5k"
......
#!/usr/bin/env python
# Copyright 2009-2021 INRIA Rhone-Alpes, Service Experimentation et
# Copyright 2009-2025 INRIA Rhone-Alpes, Service Experimentation et
# Developpement
# This file is part of Execo, released under the GNU Lesser Public
# License, version 3 or later.
......@@ -38,6 +38,7 @@ def get_git_version():
except EnvironmentError:
return None
version = p.communicate()[0].rstrip()
version = version.replace('-', '+', 1).replace('-', '.').lstrip('v')
if p.returncode != 0:
return None
return version
......@@ -187,6 +188,7 @@ if __name__ == "__main__":
version = version,
description = "Python API for asynchronous control of local or remote, standalone or parallel, unix processes.",
long_description = read_file("README.md"),
long_description_content_type="text/markdown",
author = 'Matthieu Imbert',
author_email = 'matthieu.imbert@inria.fr',
url = 'https://gitlab.inria.fr/mimbert/execo',
......
# Copyright 2009-2021 INRIA Rhone-Alpes, Service Experimentation et
# Copyright 2009-2025 INRIA Rhone-Alpes, Service Experimentation et
# Developpement
#
# This file is part of Execo.
......
# Copyright 2009-2021 INRIA Rhone-Alpes, Service Experimentation et
# Copyright 2009-2025 INRIA Rhone-Alpes, Service Experimentation et
# Developpement
#
# This file is part of Execo.
......@@ -27,11 +27,16 @@ from .report import Report
from .ssh_utils import get_rewritten_host_address, get_scp_command, \
get_taktuk_connector_command, get_ssh_command
from .utils import name_from_cmdline, non_retrying_intr_cond_wait, intr_event_wait, get_port, \
singleton_to_collection
singleton_to_collection, is_string
from traceback import format_exc
from .substitutions import get_caller_context, remote_substitute
from .time_utils import get_seconds, format_date, Timer
import threading, time, pipes, tempfile, os, shutil, stat
import threading, time, sys, tempfile, os, shutil, stat, functools
if sys.version_info >= (3,):
from shlex import quote
else:
from pipes import quote
class ActionLifecycleHandler(object):
......@@ -314,11 +319,8 @@ class Action(object):
self.kill()
return False
def expect(self, regexes, timeout = False, stream = STDOUT, backtrack_size = 2000, start_from_current = False):
"""searches the process output stream(s) for some regex. It mimics/takes ideas from Don Libes expect, or python-pexpect, but in parallel on several processes.
It is an easier-to-use frontend for
`execo.process.ExpectOutputHandler`.
def expect(self, regexes, timeout=False, stream=STDOUT, backtrack=True, expect_output_handler=None):
"""searches the process output stream(s) for some regex. It mimics/takes ideas from Don Libes expect, or python-pexpect, and does so on multiple processes running in parallel.
It waits for a regex to match on all processes. Then it
returns a list of tuples (process, regex index, match
......@@ -327,68 +329,50 @@ class Action(object):
tuple is (process, None, None). The returned list has the same
process sort order than self.processes.
It uses `execo.process.ProcessBase.expect`, see its
documentation for more details.
It waits for a regex to match, then returns the tuple
(regex_index, match_object), or (None, None) if timeout
reached, or if eof reached, or stream in error, without any
match.
It uses thread local storage such that concurrent expects in
parallel threads do not interfere which each other.
:param regexes: a regex or list of regexes. May be given as string
or as compiled regexes.
:param regexes: a regex or list of regexes. May be given as
string or as compiled regexes (If given as compiled regexes,
do not forget flags, most likely re.MULTILINE. regex passed
as string are compiled with re.MULTILINE)
:param timeout: wait timeout after which it returns (None,
None) if no match was found. If False (the default): use the
default expect timeout. If None: no timeout.
default expect timeout. If None: no timeout. If 0: return
immediately, no blocking if no match found.
:param stream: stream to monitor for this process, STDOUT or
STDERR.
:param backtrack_size: Each time some data is received, this
ouput handler needs to perform the regex search not only on
the incoming data, but also on the previously received data,
or at least on the last n bytes of the previously received
data, because the regex may match on a boundary between what
was received in a previous read and what is received in the
incoming read. These n bytes are the backtrack_size. (for
special cases: if backtrack_size == None, the regex search
is always done on the whole received data, but beware, this
is probably not what you want)
:param start_from_current: boolean. If True: when a process is
monitored by this handler for the first time, the regex
matching is started from the position in the stream at the
time that this output hander starts receiving data. If
False: when a process is monitored by this handler for the
first time, the regex matching is started from the beginning
of the stream.
:param backtrack: If True (the default), the first expect is
done from the beginning of the process start. If False, the
first expect is done from the next received process output.
:param expect_output_handler: If not None, a specific
ExpectOutputHandler instance to use. Otherwise, if None (the
default), a thread local ExpectOutputHandler is
instanciated.
"""
if timeout == False: timeout = self.default_expect_timeout
countdown = Timer(timeout)
cond = threading.Condition()
num_found_and_list = [0, {}]
for p in self.processes: num_found_and_list[1][p] = (None, None)
def internal_callback(process, stream, re_index, match_object):
num_found_and_list[0] +=1
num_found_and_list[1][process] = (re_index, match_object)
with cond:
cond.notify_all()
if self._thread_local_storage.expect_handler == None:
self._thread_local_storage.expect_handler = ExpectOutputHandler()
self._thread_local_storage.expect_handler.expect(regexes,
callback = internal_callback,
backtrack_size = backtrack_size,
start_from_current = start_from_current)
with cond:
for p in self.processes:
if stream == STDOUT:
p.stdout_handlers.append(self._thread_local_storage.expect_handler)
else:
p.stderr_handlers.append(self._thread_local_storage.expect_handler)
while (countdown.remaining() == None or countdown.remaining() > 0) and num_found_and_list[0] < len(self.processes):
non_retrying_intr_cond_wait(cond, countdown.remaining())
retval = []
countdown = Timer(timeout)
for p in self.processes:
if num_found_and_list[1][p][0] == None:
p._notify_expect_fail(regexes)
retval.append((p, num_found_and_list[1][p][0], num_found_and_list[1][p][1]))
if timeout:
p_timeout = max(0, countdown.remaining())
else:
p_timeout = None
f = p.expect(regexes, timeout=p_timeout, stream=stream, backtrack=backtrack)
retval.append((p, f[0], f[1]))
return retval
def wait_any_actions(actions, timeout = None):
......@@ -543,10 +527,20 @@ class Remote(Action):
self.processes = []
processlh = ActionNotificationProcessLH(self, len(self.hosts))
for (index, host) in enumerate(self.hosts):
this_process_args = self.process_args.copy()
for handler_kind in ['stdout_handlers', 'stderr_handlers']:
if handler_kind in this_process_args and this_process_args[handler_kind]:
new_handlers = []
for h in this_process_args[handler_kind]:
if is_string(h):
new_handlers.append(remote_substitute(h, self.hosts, index, self._caller_context))
else:
new_handlers.append(h)
this_process_args[handler_kind] = new_handlers
p = SshProcess(remote_substitute(self.cmd, self.hosts, index, self._caller_context),
host = host,
connection_params = self.connection_params,
**self.process_args)
**this_process_args)
p.lifecycle_handlers.append(processlh)
self.processes.append(p)
......@@ -636,7 +630,7 @@ class _TaktukRemoteOutputHandler(ProcessOutputHandler):
logger.critical("line received = %s", string.rstrip('\n'))
return s
def read_line(self, process, stream, string, eof, error):
def read_line(self, process, stream, string):
# my taktuk output protocol:
# stream format header normal?
# output "A $position # $line" 65 YES
......@@ -661,9 +655,9 @@ class _TaktukRemoteOutputHandler(ProcessOutputHandler):
else:
process = self.taktukaction.processes[self.taktukaction._taktuk_hosts_order[position-1]]
if header == 65: # stdout
process._handle_stdout(line, eof, error)
process._handle_stdout(line, False, False)
elif header == 66: # stderr
process._handle_stderr(line, eof, error)
process._handle_stderr(line, False, False)
else: # 67: status
process._set_terminated(exit_code = int(line))
elif header in (68, 69): # connector, state
......@@ -889,7 +883,7 @@ class TaktukRemote(Action):
port = global_port,
connection_params = self.connection_params)))
real_taktuk_cmdline += ("-F", taktuk_options_filename)
real_taktuk_cmdline = " ".join([pipes.quote(arg) for arg in real_taktuk_cmdline])
real_taktuk_cmdline = " ".join([quote(arg) for arg in real_taktuk_cmdline])
real_taktuk_cmdline += " && rm -f " + taktuk_options_filename
self._taktuk = Process(real_taktuk_cmdline)
#self._taktuk.close_stdin = False
......@@ -987,9 +981,7 @@ class Put(Remote):
processlh = ActionNotificationProcessLH(self, len(self.hosts))
for (index, host) in enumerate(self.hosts):
real_command = list(get_scp_command(host.user, host.keyfile, host.port, self.connection_params)) + [ remote_substitute(local_file, self.hosts, index, self._caller_context) for local_file in self.local_files ] + ["%s:%s" % (get_rewritten_host_address(host.address, self.connection_params), remote_substitute(self.remote_location, self.hosts, index, self._caller_context)),]
real_command = ' '.join(real_command)
p = Process(real_command)
p.shell = True
p.lifecycle_handlers.append(processlh)
p.host = host
self.processes.append(p)
......@@ -1051,9 +1043,7 @@ class Get(Remote):
for path in self.remote_files:
remote_specs += ("%s:%s" % (get_rewritten_host_address(host.address, self.connection_params), remote_substitute(path, self.hosts, index, self._caller_context)),)
real_command = get_scp_command(host.user, host.keyfile, host.port, self.connection_params) + remote_specs + (remote_substitute(self.local_location, self.hosts, index, self._caller_context),)
real_command = ' '.join(real_command)
p = Process(real_command)
p.shell = True
p.lifecycle_handlers.append(processlh)
p.host = host
self.processes.append(p)
......@@ -1071,7 +1061,7 @@ class _TaktukPutOutputHandler(_TaktukRemoteOutputHandler):
else:
process._set_terminated(exit_code = 0)
def read_line(self, process, stream, string, eof, error):
def read_line(self, process, stream, string):
try:
if len(string) > 0:
header = ord(string[0])
......@@ -1223,7 +1213,7 @@ class _TaktukGetOutputHandler(_TaktukRemoteOutputHandler):
else:
process._set_terminated(exit_code = 0)
def read_line(self, process, stream, string, eof, error):
def read_line(self, process, stream, string):
try:
if len(string) > 0:
header = ord(string[0])
......
# Copyright 2009-2021 INRIA Rhone-Alpes, Service Experimentation et
# Copyright 2009-2025 INRIA Rhone-Alpes, Service Experimentation et
# Developpement
#
# This file is part of Execo.
......@@ -284,9 +284,7 @@ class _Conductor(object):
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if (maxfd == resource.RLIM_INFINITY):
maxfd = MAXFD
for fd in range(0, maxfd):
try: os.close(fd)
except OSError: pass
os.closerange(0, MAXFD)
while True:
# poll each second that my parent is still alive. If
# not, die. Optionnaly kill all instanciated childs.
......@@ -376,6 +374,7 @@ class _Conductor(object):
reaper_thread = threading.Thread(target = self.__reaper_thread_func, name = "Reaper")
reaper_thread.setDaemon(True)
reaper_thread.start()
threading.Thread(target = process._trigger_start_handlers).start()
def __handle_update_process(self, process):
# intended to be called from conductor thread
......