Commit 20e0ee4c authored by Maverick Chardet's avatar Maverick Chardet
Browse files

Added synchronize_timeout function in Assembly + example

+ other stuff
parent a4c40d95
......@@ -141,6 +141,12 @@ class Assembly (object):
self.synchronize()
self.alive = False
self.semantics_thread.join()
def kill(self):
"""Warning: use only as last resort, anything run by the assembly will be killed, not exiting properly code
run by the transitions """
self.alive = False
self.semantics_thread.join()
def print(self, string : str):
if self.verbosity < 0:
......@@ -319,6 +325,19 @@ class Assembly (object):
Printer.st_err_tprint("Synchronizing. %d unfinished tasks:\n- %s (in progress)\n%s\n"%(
self.instructions_queue.unfinished_tasks, str(self.current_instruction), "\n".join(["- %s"%str(ins) for ins in self.instructions_queue.queue])))
self.instructions_queue.join()
def synchronize_timeout(self, time):
from concerto.utility import timeout
finished = False
with timeout(time):
self.synchronize()
finished = True
if finished:
return True, None
else:
return False, self.get_debug_info()
def is_component_idle(self, component_name : str) -> bool:
......
......@@ -71,7 +71,6 @@ class WeightedGraph:
i += 1
treated_vertices = set()
treated_arcs = set()
gstr = "digraph G {\n"
gstr += "\trankdir=BT;\n"
......@@ -126,6 +125,20 @@ class WeightedGraph:
self._graph[v] = list(filter(lambda t: t[0] not in not_reached, self._graph[v]))
return not_reached
def detect_cycle(self, source_vertex):
def _dfs(vertex, visited):
if vertex in visited:
i = visited.index(vertex)
cycle = visited[i:] + [vertex]
return cycle
for (child, _, _) in self._graph[vertex]:
r = _dfs(child, visited+[vertex])
if r:
return r
return None
return _dfs(source_vertex, [])
def get_longest_path_length(self, source_vertex, destination_vertex, transitions_lengths):
def _topological_order():
unmarked = set(self._graph.keys())
......
......@@ -95,3 +95,28 @@ def remove_if(l : List[Any], remove_cond : Callable[[Any], bool]):
def empty_transition():
pass
import signal
from contextlib import contextmanager
@contextmanager
def timeout(time):
# Register a function to raise a TimeoutError on the signal.
signal.signal(signal.SIGALRM, raise_timeout)
# Schedule the signal to be sent after ``time``.
signal.alarm(time)
try:
yield
except TimeoutError:
pass
finally:
# Unregister the signal so it won't be triggered
# if the timeout is not reached.
signal.signal(signal.SIGALRM, signal.SIG_IGN)
def raise_timeout(signum, frame):
raise TimeoutError
from mad import *
from concerto.all import *
import time
......
from concerto.component import Component
from concerto.dependency import DepType
from concerto.assembly import Assembly
from concerto.reconfiguration import Reconfiguration
from concerto.meta import ReconfigurationPerfAnalyzer
class HAProxy(Component):
def create(self):
self.places = [
'initiated',
'deployed'
]
self.transitions = {
'deploy': ('initiated', 'deployed', 'deploy', 0, self.deploy)
}
self.dependencies = {
'haproxy': (DepType.PROVIDE, ['deployed']),
'facts': (DepType.USE, ['deploy'])
}
self.initial_place = 'initiated'
def deploy(self):
pass
a = Assembly()
deploy = Reconfiguration()
deploy.add("proxy", HAProxy)
deploy.push_behavior("proxy", "deploy")
deploy.wait_all()
pa = ReconfigurationPerfAnalyzer(deploy)
g = pa.get_graph()
g.save_as_dot("haproxy.dot")
formula = pa.get_exec_time_formula()
print(formula.to_string(lambda x: '.'.join(x)))
import time
from concerto.all import Component, Assembly
class SimpleComponent(Component):
def __init__(self, deploy_time):
self.deploy_time = deploy_time
super().__init__()
def create(self):
self.places = [
'undeployed',
'deployed'
]
self.transitions = {
'deploy_transition': ('undeployed', 'deployed', 'deploy', 0, self.deploy_function)
}
self.initial_place = "undeployed"
def deploy_function(self):
time.sleep(self.deploy_time)
class SimpleAssembly(Assembly):
def __init__(self, deploy_time):
super().__init__()
self.simple_component = SimpleComponent(deploy_time)
def deploy(self):
self.add_component("my_component", self.simple_component)
self.push_b("my_component", "deploy")
self.wait_all()
assembly = SimpleAssembly(5)
assembly.deploy()
finished, debug_info = assembly.synchronize_timeout(2)
if finished:
print("Deployement successful")
assembly.terminate()
else:
print("Error! Timeout on deploy! Debug info:")
print(debug_info)
assembly.kill()
# Transitions may still be running at this point because we can't simply kill the threads running the transitions
print("Will terminate when the running transitions are over...")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment