# -*- coding: utf-8 -*- ## Filename : CLUnfolder.py ## Author(s) : Michel Le Borgne ## Created : 22 mars 2012 ## Revision : ## Source : ## ## Copyright 2012 : IRISA/IRSET ## ## This library is free software; you can redistribute it and/or modify it ## under the terms of the GNU General Public License as published ## by the Free Software Foundation; either version 2.1 of the License, or ## any later version. ## ## This library is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY, WITHOUT EVEN THE IMPLIED WARRANTY OF ## MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. The software and ## documentation provided here under is on an "as is" basis, and IRISA has ## no obligations to provide maintenance, support, updates, enhancements ## or modifications. ## In no event shall IRISA be liable to any party for direct, indirect, ## special, incidental or consequential damages, including lost profits, ## arising out of the use of this software and its documentation, even if ## IRISA have been advised of the possibility of such damage. See ## the GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this library; if not, write to the Free Software Foundation, ## Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. ## ## The original code contained here was initially developed by: ## ## Michel Le Borgne. ## IRISA ## Symbiose team ## IRISA Campus de Beaulieu ## 35042 RENNES Cedex, FRANCE ## ## ## Contributor(s): Geoffroy Andrieux IRSET ## """ Main engine for constraint unfolding and solving """ from __future__ import print_function #from pyCryptoMS import CryptoMS from pycryptosat import Solver as CryptoMS from cadbiom.models.biosignal.translators.gt_visitors import compile_cond, compile_event from cadbiom.models.clause_constraints.CLDynSys import Clause, Literal from cadbiom.models.clause_constraints.mcl.MCLSolutions import RawSolution, MCLException from cadbiom import commons as cm # C++ API from _cadbiom import shift_clause, shift_dynamic, forward_code, \ forward_init_dynamic # Standard imports from logging import DEBUG import itertools as it LOGGER = cm.logger() class CLUnfolder(object): """ When loading a model in a MCLAnalyser object, a CLUnfolder is generated which implies the compilation of the dynamical system into a clause constraint dynamical system and the DIMACS coding of all the variables. This coding cannot be changed later. The unfolding of constraints is performed efficiently on the numeric form of the clause constraints. The CLUnfolder provide various method to convert variable names to DIMACS code and back, to extract the frontier of a model and to extract informations from raw solutions. Dynamic constraints unfolding management ======================================== Each variable is coded as an integer (in var_code_table and var_list). Each clause is represented as a list of signed integers (DIMACS coding). During unfolding, clauses are shifted either to the future (forward) or to the past (backward). The shift operator is a simple addition of self.__shift_step. The shift direction depends only on initializations. self.__shift_step depends on the number of variables so it is impossible to add variables while generating a trajectory unfolding. Glossary ======== - ground variables/ground dimacs code: variables at time 0/their encoding. - solution: a solution of the logical dynamic constraint from SAT solver - state vector: list of DIMACS code of original (not shifted) variables corresponding to a step. - trajectory: list of state_vectors """ def __init__(self, dynamic_system): """ :param dynamic_system: Describe a dynamic system in clause form. :type dynamic_system: """ self.dynamic_system = dynamic_system # symbolic clause dynamic system # shift_step equals to the total number of coded variables # (including inputs, entities, clocks/events, auxiliary variables) # shift_step_init: the shift step ss (if n is X_0 code, n+ss is X_1 code) self.shift_step_init = dynamic_system.get_var_number() # shift step of system (number of variables of the system) self.__shift_step = self.shift_step_init # current shift/step # self.shift_step must be frozen in order to avoid problems during the # unflatten() step of RawSolutions. # => Each step MUST have the same number of variables. # Thus, we lock the Unfolder by turning self.__locked to True. # See __shift_clause() and __m_shift_clause() self.__locked = False self.__current_step = 1 # current number of unfolding # For reachability optimisation. Number of shifts before checking # the final property self.__steps_before_check = 0 # assign a DIMACS code number to each variable (invariant) # Create mappings between names and values of the variables of the system # dynamic_system.base_var_set : Set of ALL variables of the dynamic system # (including inputs, entities, clocks/events, auxiliary variables). self.__var_code_table = dict() # Mapping: name -> DIMACS code (!= 0) self.__var_list = ['##'] # Mapping: DIMACS code (!= 0) -> name # Fix order of variables names with a list: # - indexes of __var_list are the values of the variables at these indexes # - values begin from 1 (0 is a blacklisted value) base_var_set = list(dynamic_system.base_var_set) self.__var_list += base_var_set # keys: are the names of the variables, values: their values self.__var_code_table = \ {var_name: var_num for var_num, var_name in enumerate(base_var_set, 1)} # Optimization: for forward_init_dynamic() and forward_code() (mostly # implemented in C. # Precompute __var_code_table with the literal names in "future" format; # i.e with a last char '`' at the end. # Thus it is easy to search them directly in the dict (in O(1)), # instead of slicing them. # Note: "future" literals are already in self.dynamic_system.list_clauses # and maybe in self.dynamic_system.aux_list_clauses temp_var_code_table = \ {var_name + "`": var_num for var_name, var_num in self.__var_code_table.iteritems()} self.__var_code_table.update(temp_var_code_table) assert len(self.__var_code_table) == 2 * self.__shift_step # Include auxiliary clause to eliminate undesirable solutions # If A->B has clock h, an aux constraint added is h included in h when A # Must be set to False for inhibitors computations self.__include_aux_clauses = True self.__lit_cpt = self.__shift_step + 1 # counter for aux. var. coding # Same mapping tools for auxiliary variables introduced by properties # compilation # Cf __code_clause() self.__aux_code_table = dict() # name to code self.__aux_list = [] # code to name # ordered list of DIMACS codes of the simple variables/places # Disable all variables of places (entities) in the model (except frontiers) self.__no_frontier_init = [[-self.__var_code_table[nfp]] for nfp in self.dynamic_system.no_frontiers] # ordered list of DIMACS codes of the frontier variables/places self.frontier_values = [self.__var_code_table[frp] for frp in self.dynamic_system.frontiers] self.frontier_values.sort() ## TODO: sort utile ici ???? si non passer en frozenset et supprimer les casts partout ailleurs ## Cf RawSolution.frontier_pos_and_neg_values BACKWARD, encore utilisé avec index ## Cf TestCLUnfolder.test_frontier indexable mais peut etre contourné # Precompute convenient attributes for: # frontiers_pos_and_neg: # - RawSolution.frontier_pos_and_neg_values # (set operation with solution variables) # frontiers_negative_values: # - MCLQuery.from_frontier_sol_new_timing # - MCLQuery.frontiers_negative_values # - MCLAnalyser.__solve_with_inact_fsolution # - TestCLUnfolder.test_prune # # Set of frontier positive and negative values # (all frontiers and their opposite version). # => operations with sets are much faster self.frontiers_negative_values = \ frozenset(-frontier for frontier in self.frontier_values) self.frontiers_pos_and_neg = \ self.frontiers_negative_values | frozenset(self.frontier_values) # DIMACS codes of the input variables/places for extraction (inv) self.__inputs = frozenset(self.__var_code_table[inp] for inp in self.dynamic_system.inputs) # DIMACS codes of the free_clocks variables/places for extraction (invariant) self.__free_clocks = frozenset(self.__var_code_table[fcl] for fcl in self.dynamic_system.free_clocks) # Binding for a merged version of __inputs and __free_clocks # Convenient attribute for RawSolution.extract_act_input_clock_seq() # DIMACS codes of the input and free_clocks variables self.inputs_and_free_clocks = self.__inputs | self.__free_clocks # Properties to be checked self.reset() # Logical constraints: # Result from unfolding of base constraints # Boolean vectors signification: # X: Current state of places (activated/unactivated) # X': Future state of places # H: Current 'free' events (present/not present) => ? # I: Current inputs => ? self.__dynamic_constraints = [] # DIMACS clauses: X' = f(X,H,I) # Simple temporal properties: # SP(X0): Initial property/start property; Never change at each step. # IP(X): Invariant property # VP(X): Variant property # List of logical formulas of properties forced at each step # It's the trajectory of events of a solution. # FP(X): Final property self.__initial_constraints = [] # DIMACS clauses: C(X_0) >> self.__final_constraints = [] # idem: C(X_n) self.__invariant_constraints = [] # DIMACS clauses: C(X_i)) self.__variant_constraints = [] # variant constraints along trajectory self.__shift_direction = None # FORWARD or BACKWARD # statistics on solver self.__stats = False self.__nb_vars = 0 self.__nb_clauses = 0 def reset(self): """Reset the unfolder before a new query Reset only properties and dimacs clauses from the current query; AND __list_variant_constraints. This function is called from the constructor and during MCLAnalyser.sq_is_satisfiable() and MCLAnalyser.sq_solutions() following the call of init_with_query(). """ # Properties to be checked self.__initial_property = None # logical formula - literal boolean expression self.__dimacs_initial = None # list of DIMACS clauses self.__final_property = None # logical formula self.__dimacs_final = None # list of DIMACS clauses self.__invariant_property = None # logical formula self.__dimacs_invariant = None # list of DIMACS clauses self.__variant_property = None # list self.__dimacs_variant = None # list> # list of variant temporal constraints in Dimacs ground code self.__list_variant_constraints = None # list> # If this function is called from the constructor, # the following variables are just redefined here. # For reachability optimisation. Number of shifts before checking # the final property self.__steps_before_check = 0 self.__shift_direction = None # FORWARD or BACKWARD self.__locked = False def init_with_query(self, query): """Initialise the unfolder with the given query Following attributes are used from the query: start_prop, dim_start, inv_prop, dim_inv, final_prop, dim_final, variant_prop, dim_variant_prop, steps_before_check Textual properties of query are compiled into numeric clauses in init_forward_unfolding(), just before squery_is_satisfied() and squery_solve(). This compilation step is costly due to ANTLR performances... """ # Reset the unfolder before a new query self.reset() # Init with query properties and clauses self.__initial_property = query.start_prop # logical formula in text self.__dimacs_initial = query.dim_start self.__final_property = query.final_prop # logical formula in text self.__dimacs_final = query.dim_final self.__invariant_property = query.inv_prop # logical formula in text self.__dimacs_invariant = query.dim_inv # It's the trajectory of events of a solution. self.__variant_property = query.variant_prop # logical formula in text self.__dimacs_variant = query.dim_variant_prop # For reachability optimisation. Number of shifts before checking # the final property (default 0) self.__steps_before_check = query.steps_before_check def set_stats(self): """Enable solver statistics (for tests)""" self.__stats = True self.__nb_vars = 0 self.__nb_clauses = 0 def unset_stats(self): """Disable solver statistics (never used)""" self.__stats = False def _stats(self): """Display solver statistics""" print("\n NB Variables in solver:", self.__nb_vars) print("NB Clauses in solver:", self.__nb_clauses) def set_include_aux_clauses(self, val): """ Include auxiliary clause to eliminate undesirable solutions If A->B has clock h, an aux constraint added is h included in h when A Must be set to False for inhibitors computations """ self.__include_aux_clauses = val ## Internal variable access for tests (never used) ######################### @property def dynamic_constraints(self): """For tests: returns coded dynamic constraints""" return self.__dynamic_constraints @property def initial_constraints(self): """For tests: returns coded initial constraints""" return self.__initial_constraints @property def invariant_constraints(self): """For tests: returns coded invariant constraints""" return self.__invariant_constraints @property def variant_constraints(self): """For tests: returns coded variant constraints""" return self.__variant_constraints @property def final_constraints(self): """For tests: returns coded final constraints""" self.__final_constraints ## Variables management #################################################### def var_names_in_clause(self, clause): """Return the names of the variables from values in the given numeric clause (DEBUG never used) """ return [self.get_var_indexed_name(var) for var in clause] def var_dimacs_code(self, var_name): """Returns DIMACS code of var_name (string) variable (for tests)""" return self.__var_code_table[var_name] def get_system_var_number(self): """Get number of variables in the clause constraint dynamical system (including inputs, entities, clocks/events, auxiliary variables) (for tests) .. note:: This number should be equal to the number of variables in self.get_var_number() """ return self.dynamic_system.get_var_number() def get_var_number(self): """Get number of principal variables (properties excluded) in the unfolder (including inputs, entities, clocks/events, auxiliary variables) (for tests) .. note:: This number should be equal to the number of variables in self.get_system_var_number() """ # Remove the blacklisted first item (##) for variables naming return len(self.__var_list) - 1 def get_var_name(self, var_num): """Get name of the variable .. seealso:: Explanations on get_var_indexed_name() @param var_num: DIMACS literal coding of an initial variable @return: name of the variable """ # Get the original var_code without shifts var_code = (abs(var_num) - 1) % self.__shift_step + 1 if var_code <= self.shift_step_init: # Variable num is less than the number of the variables in the system # => variable from the initial system return self.__var_list[var_code] else: # auxiliary variable introduced by properties compilation return self.__aux_list[var_code - self.shift_step_init -1] #raise MCLException("Not a DIMACS code of an initial variable") def get_var_indexed_name(self, var_num): """Get name of the variable with the time index appended .. note:: time index is the number of steps since the begining of the simulation. :Example: __shift_step = 2 (current step of the unfolding) shift_step_init = 2 (number of variables in the system) __var_list = ["##", "n1", "n2"] Virtual list of indexes in the state vector of a solution: [0, 1, 2, 3, 4, 5, 6, 7, 8, ...] #|n1,n2|n1,n2|n1,n2|n1,n2| ti=0 ti=1 ti=2 ti=3 Given var_num = 7: index of var_name in __var_list: ((7-1) % 2) + 1 = 1 var_name = "n1" time index: (7 - 1) / 2 = 3 => return: "n1_3" @param var_num: DIMACS literal coding @return: name of the variable with the time index appended """ varnum1 = abs(var_num) # Get the original var_code without shifts var_code = (varnum1 - 1) % self.__shift_step + 1 var_step = (varnum1 - var_code) / self.__shift_step if var_code <= self.shift_step_init: # Variable num is less than the number of the variables in the system # => variable from the initial system return self.__var_list[var_code] + '_%s'% var_step else: # Auxiliary variable introduced by properties compilation index = var_code - self.shift_step_init - 1 return self.__aux_list[index] + '_%s'% var_step def get_free_clocks(self): """Get the DIMACS codes of the free_clocks variables""" return self.__free_clocks def get_inputs(self): """Get the DIMACS codes of the input variables""" return self.__inputs def get_shift_direction(self): """ @return: string "FORWARD" or "BACKWARD" """ return self.__shift_direction def get_shift_step(self): """ @return: the shift step ss (if n is X_0 code, n+ss is X_1 code) """ return self.__shift_step def get_current_step(self): """ @return: the current number of unfolding """ return self.__current_step ## Translation from names to num codes ##################################### ## The translation depends on the shift direction def __forward_code(self, clause): """(deprecated, directly included in C++ module: forward_init_dynamic()) Numerically code a clause with the numeric code found in self.__var_code_table for a base variable x, and numeric_code + shift_step for x' variable ..note:: Future variables x' are noted "name`" in Literal names. Values of variables increases of __shift_step in future steps. @param clause: a Clause object @return: the DIMACS coding of the forward shifted clause """ # Old API # num_clause = [] # for lit in clause.literals: # if lit.name[-1] == '`': # t+1 variable # num_clause.append( # -(self.__var_code_table[lit.name[:-1]] + self.__shift_step) \ # if not lit.sign \ # else (self.__var_code_table[lit.name[:-1]] + self.__shift_step) # ) # else: # t variable # num_clause.append( # -self.__var_code_table[lit.name] \ # if not lit.sign else self.__var_code_table[lit.name] # ) # return num_clause # New API via C++ module return forward_code(clause, self.__var_code_table, self.__shift_step) def __backward_code(self, clause): """(never used, backward is partially implemented) numerically code a clause with the numeric code found in self.__var_code_table + shift_step for a base variable x, and numeric_code for x' variable integer coding increases in past steps @param clause: a Clause object @return: the DIMACS coding of the backward shifted clause """ num_clause = [] for lit in clause.literals: if lit.name[-1] == '`': # t+1 variable num_clause.append( -self.__var_code_table[lit.name[:-1]] \ if not lit.sign \ else self.__var_code_table[lit.name[:-1]] ) else: # t variable, base variable num_clause.append( -(self.__var_code_table[lit.name] + self.__shift_step) \ if not lit.sign \ else (self.__var_code_table[lit.name] + self.__shift_step) ) return num_clause def __code_clause(self, clause): """Numerically code a clause The numerical values are found in self.__var_code_table for variables in the dynamic system, or in self.__aux_code_table for other auxiliary variables; assume all variables are basic variables (t=0) @warning: MODIFIES SHIFT_STEP if auxiliary variables are present (numeric code) Because we add a variable in the system... :param clause: :return: List of numerical values corresponding to the literals in the given clause. :type clause: :rtype: > """ num_clause = [] for lit in clause.literals: # Get variable value with the given name name = lit.name if name in self.__var_code_table: var_cod = self.__var_code_table[name] elif name in self.__aux_code_table: var_cod = self.__aux_code_table[name] else: # Create an auxiliary variable - modifies shift_step (nb of variables) self.__shift_step += 1 # Mapping name to value code self.__aux_code_table[name] = self.__shift_step # Mapping value code to name self.__aux_list.append(name) var_cod = self.__shift_step # Add the sign to the value lit_code = var_cod if lit.sign else -var_cod num_clause.append(lit_code) return num_clause ## Dynamic initialisations ################################################# def __forward_init_dynamic(self): """Dynamics initialisations. Set dynamic constraints for a forward one step: X1 = f(X0) Numerically code clauses with the numeric codes found in self.__var_code_table for a base variable x, and numeric_code + shift_step for x' variable integer coding increases in future steps. __dynamic_constraints is a list of lists of numeric clauses (lists of ints) Each sublist of __dynamic_constraints corresponds to a step in the unfolder; the last step is the last element. .. note:: Called by init_forward_unfolding() .. note:: Future variables x' are noted "name`" in Literal names. .. note:: Values of variables increases of __shift_step in future steps. """ # New API via C++ module # TODO: take a look at __backward_init_dynamic & __backward_code if self.__include_aux_clauses: # Auxiliary clauses are supported (default) self.__dynamic_constraints = \ forward_init_dynamic(self.dynamic_system.list_clauses, self.__var_code_table, self.__shift_step, self.dynamic_system.aux_list_clauses) else: self.__dynamic_constraints = \ forward_init_dynamic(self.dynamic_system.list_clauses, self.__var_code_table, self.__shift_step) def __backward_init_dynamic(self): """Dynamics initialisations. (never used, backward is partially implemented) Set dynamic constraints for a forward one step: X0 = f(X1) .. note:: Called by init_backward_unfolding() """ num_clause_list = \ [self.__backward_code(clause) for clause in self.dynamic_system.list_clauses] if self.__include_aux_clauses: num_clause_list += \ [self.__backward_code(clause) for clause in self.dynamic_system.aux_list_clauses] self.__dynamic_constraints = [num_clause_list] ## Shifting variables: implementation of the shift operator ################ def __shift_clause(self, ncl): """Shift a clause for the current __shift_step (no used anymore, replaced by direct use of C++ code) Basically, `shift_step` is added to positive variables and subtracted from negative variables in `numeric_clause`. self.shift_step must be frozen in order to avoid problems during the unflatten() step of RawSolutions. => Each step MUST have the same number of variables. Thus, we lock the Unfolder by turning self.__locked to True. @param ncl: DIMACS clause ## TODO rename @warning: lock the unfolder """ # Froze unfolder to avoid modifications of shift_step self.__locked = True # Old API # Less efficient with abs() # return [(abs(lit) + self.__shift_step) * (-1 if lit < 0 else 1) for lit in ncl] # More efficient with ternary assignment # return [(lit + self.__shift_step) if lit > 0 else (lit - self.__shift_step) # for lit in ncl] # New API via C++ module return shift_clause(ncl, self.__shift_step) def __m_shift_clause(self, ncl, nb_steps): """Shift a clause for the given step number self.shift_step must be frozen in order to avoid problems during the unflatten() step of RawSolutions. => Each step MUST have the same number of variables. Thus, we lock the Unfolder by turning self.__locked to True. Called by: __shift_variant() @param ncl: DIMACS clause @param nb_steps: number of shifts asked @warning: lock the unfolder """ # Froze unfolder to avoid modifications of shift_step self.__locked = True return [(lit + self.__shift_step * nb_steps) if lit > 0 else (lit - self.__shift_step * nb_steps) for lit in ncl] def __shift_dynamic(self): """Shift clauses representing the dynamics X' = f(X,I,C) Shift the last item of self.__dynamic_constraints and append the result to self.__dynamic_constraints. .. note:: Called by shift() """ # Old API # self.__dynamic_constraints.append( # [self.__shift_clause(clause) # for clause in self.__dynamic_constraints[-1]] # ) # New API via C++ module # __dynamic_constraints: # List of lists of DIMACS encoded clauses (lists of ints) self.__dynamic_constraints.append( shift_dynamic( self.__dynamic_constraints[-1], # >> self.__shift_step ) ) def __shift_initial(self): """Shift initialisation condition .. note:: Called by: - shift() - init_backward_unfolding() """ self.__initial_constraints = \ shift_dynamic( self.__initial_constraints, self.__shift_step ) def __shift_final(self): """Shift final property .. note:: Called by: - shift() - init_forward_unfolding() """ self.__final_constraints = \ shift_dynamic( self.__final_constraints, self.__shift_step ) def __shift_invariant(self): """Shift invariant property .. note:: Called by: - shift() - init_forward_unfolding() - init_backward_unfolding() """ if self.__invariant_constraints: # New API via C++ module self.__invariant_constraints.append( shift_dynamic( self.__invariant_constraints[-1], self.__shift_step ) ) def __shift_variant(self): """Shift variant property - depends on unfolding direction ## TODO audit... .. warning:: Refactor note: unclear function. No apparent reason for only the first clause to be shifted... Process: - Take the clauses (constraint) of the current step - Shift the first clause - Append it to __variant_constraints - Stop ???? Example: current_step: 2 (shift for step 3) [[], [[4], [4]], [[6], [7]], []] => Only [6] is shifted... .. note:: Called by shift() """ if not self.__list_variant_constraints: return if self.__shift_direction == "FORWARD": # Constraint t0 already included from the query during initialisation current_constraint = self.__list_variant_constraints[self.__current_step] # shift constraint at current step and add to var_constraints for clause in current_constraint: s_clause = self.__m_shift_clause(clause, self.__current_step) self.__variant_constraints.append(s_clause) return # ????? elif self.__shift_direction == "BACKWARD": raise MCLException("Not yet implemented") else: raise MCLException("Shift incoherent data: " + self.__shift_direction) def shift(self): """Shift all clauses of one step self.__current_step is incremented here! Clauses shifted: - __dynamic_constraints[-1] (last element) - __invariant_constraints[-1] (last element) - __list_variant_constraints => TODO: audit... - __final_constraints if __shift_direction is "FORWARD" - __initial_constraints if __shift_direction is "BACKWARD" """ # Froze unfolder to avoid modifications of shift_step self.__locked = True self.__shift_dynamic() self.__shift_invariant() self.__shift_variant() if self.__shift_direction == 'FORWARD': self.__shift_final() elif self.__shift_direction == 'BACKWARD': self.__shift_initial() else: # Shift direction must be set raise MCLException("Shift incoherent data: " + self.__shift_direction) # Increment the current step self.__current_step += 1 ## Coding of properties #################################################### def __compile_property(self, property_text): """Compile a property (logical formula) into clauses Type checking uses the symbol table of dyn_sys Error reporting is made through the dyn_sys reporter .. warning:: MODIFIES __lit_cpt for numbering of auxiliary variables (not coding) .. note:: Called by: - __init_initial_constraint_0() - __init_final_constraint_0() - __init_invariant_constraint_0() """ if self.__locked: raise MCLException("Trying to compile property while unfolder is locked") # syntax analyser and type checker # property_text, symb_t, reporter tree_prop = compile_cond(property_text, self.__dyn_sys.symb_tab, self.__dyn_sys.report) # avoid name collisions of aux var prop_visitor = CLPropertyVisitor(self.__lit_cpt) tree_prop.accept(prop_visitor) self.__lit_cpt = prop_visitor.cpt # avoid name collisions of aux var return prop_visitor.clauses def __compile_event(self, property_text): """Compile an event (biosignal expression) into clauses Type checking uses the symbol table of dyn_sys Error reporting is made through the dyn_sys reporter .. warning:: MODIFIES __lit_cpt for numbering of auxiliary variables (not coding) .. note:: Called by: - __init_variant_constraints_0() """ if self.__locked: raise MCLException("Trying to compile property while unfolder is locked") # Syntax analyser and type checker tree_prop, ste, fcl = compile_event( property_text, self.dynamic_system.symb_tab, True, self.dynamic_system.report ) # Avoid name collisions of aux var prop_visitor = CLPropertyVisitor(self.__lit_cpt) tree_prop.accept(prop_visitor) self.__lit_cpt = prop_visitor.cpt # avoid name collisions of aux var return prop_visitor.clauses def __init_initial_constraint_0(self, no_frontier_init=True): """ if initial property is set, generate constraints clauses in numeric form base variable form - we have to wait until all variables are num coded before shifting anything!! """ # OLD # self.__initial_constraints = [] # if no_frontier_init: # # initialize no frontier places to False # for nfp in self.__no_frontier_init: # self.__initial_constraints.append(nfp) # if self.__initial_property: # # compile initial property # l_clauses = self.__compile_property(self.__initial_property) # # compile numeric form # for clause in l_clauses: # self.__initial_constraints.append(self.__code_clause(clause)) # # dimacs aux initial properties # dim_init = self.__dim_initial # if dim_init: # self.__initial_constraints = self.__initial_constraints + dim_init self.__initial_constraints = list() # initialize no frontier places to False if no_frontier_init: self.__initial_constraints += [elem for elem in self.__no_frontier_init] if self.__initial_property: # compile initial property # compile numeric form self.__initial_constraints += [self.__code_clause(clause) for clause in self.__compile_property(self.__initial_property)] # dimacs aux initial properties if self.__dim_initial: self.__initial_constraints += self.__dim_initial def __init_final_constraint_0(self): """ if final property is set, generate constraints clauses in numeric form base variable used - we have to wait until all variables are num coded before shifting anything!! """ self.__final_constraints = [] if self.__final_property: # compile initial (X0) property # ex: Px => [$Px$] l_clauses = self.__compile_property(self.__final_property) # compile numeric form # ex: [$Px$] => self.__final_constraints = [[7]] for clause in l_clauses: self.__final_constraints.append(self.__code_clause(clause)) dim_fin = self.__dim_final if dim_fin: self.__final_constraints = self.__final_constraints + dim_fin def __init_invariant_constraint_0(self): """ if trajectory property is set, generate constraints clauses in numeric form base variable used - we have to wait until all variables are num coded before shifting anything!! """ self.__invariant_constraints = [] if self.__invariant_property: # compile initial (X0) property l_clauses = self.__compile_property(self.__invariant_property) # compile numeric form init_const = [] for clause in l_clauses: init_const.append(self.__code_clause(clause)) self.__invariant_constraints.append(init_const) d_inv = self.__dim_invariant if d_inv: self.__invariant_constraints = self.__invariant_constraints + d_inv def __init_variant_constraints_0(self): """Code variant constraints in a numerical clause. variant_property list d'étapes, chaque étape a une formule logique impliquant les évènements (et les places? pas sur.;) => ensemble de place/propriétés devant ^etre valides à chaque étape If variant_property is set, compile each property into clauses and encode these clauses. Clauses already in dimacs form are added. The two variant constraint forms must be compatible (same length) self.__list_variant_constraints is built here and is used later to shift given variant properties in __shift_variant(). """ # coding of variant properties # Take __variant_property in priority on __dimacs_variant if self.__variant_property: self.__list_variant_constraints = list() for prop in self.__variant_property: # compile initial (X0) property into numeric form # For each property in step of __variant_property self.__list_variant_constraints.append( [self.__code_clause(clause) for clause in self.__compile_event(prop)] ) print(self.__list_variant_constraints) if self.__dimacs_variant: # Add DIMACS aux clauses initial properties # Check if the number of steps is equal if len(self.__variant_property) != len(self.__dimacs_variant): raise MCLException( "Incoherent variant properties; sizes of " "__variant_property and __dimacs_variant differ" ) # Merge together all steps content # Ex: # __list_variant_constraints due to __variant_property convertion: # [[], [[4]], [], []] # __dimacs_variant: # [[], [[4]], [[6], [7]], []] # Result: # [[], [[4], [4]], [[6], [7]], []] self.__list_variant_constraints = \ [list(it.chain(*cpl)) for cpl in zip(self.__dimacs_variant, self.__list_variant_constraints)] elif self.__dimacs_variant: # No __variant_property, keep only __dimacs_variant self.__list_variant_constraints = self.__dimacs_variant # Initialisation if self.__list_variant_constraints: if self.__shift_direction == "FORWARD": # For now, keep only the events of the first step (t0) # Other steps are taken during shift() calls self.__variant_constraints = self.__list_variant_constraints[0] elif self.__shift_direction == "BACKWARD": raise MCLException("Not yet implemented") else: raise MCLException("Shift incoherent data: " + self.__shift_direction) ## Whole initialisations ################################################### def init_forward_unfolding(self): """Initialisation before generating constraints - forward trajectory Called at the begining of squery_is_satisfied() and squery_solve() """ self.__shift_direction = 'FORWARD' self.__current_step = 1 self.__shift_step = self.shift_step_init # back to basic! self.__aux_code_table = dict() # flush auxiliary variables self.__aux_list = [] # idem # init properties to generate all variable num codes self.__init_initial_constraint_0() self.__init_final_constraint_0() self.__init_invariant_constraint_0() self.__init_variant_constraints_0() # now shifting is possible self.__forward_init_dynamic() self.__shift_final() self.__shift_invariant() def init_backward_unfolding(self): """Initialisation before generating constraints - backward trajectory Never used (and backward not implemented in __init_variant_constraints_0) """ self.__shift_direction = 'BACKWARD' self.__current_step = 0 self.__shift_step = self.shift_step_init # back to basic! self.__aux_code_table = dict() # flush auxiliary variables self.__aux_list = [] # idem # init properties to generate all variable num codes self.__init_initial_constraint_0() self.__init_final_constraint_0() self.__init_invariant_constraint_0() self.__init_variant_constraints_0() # now shifting is possible self.__backward_init_dynamic() self.__shift_initial() self.__shift_invariant() ## Solver interface ######################################################## def __load_solver(self, solv): """Add all the current clauses in the solver .. note:: Called by: - __constraints_satisfied() - __msolve_constraints() """ # New API via C++ module solv.add_clauses(self.__final_constraints) solv.add_clauses(it.chain(*self.__invariant_constraints)) solv.add_clauses(self.__variant_constraints) solv.add_clauses(it.chain(*self.__dynamic_constraints)) solv.add_clauses(self.__initial_constraints) # LOGGING if LOGGER.getEffectiveLevel() == DEBUG: # final LOGGER.debug("Load new solver !!") LOGGER.debug(">> final constraints: %s", len(self.__final_constraints)) LOGGER.debug(str(self.__final_constraints)) # trajectory invariant LOGGER.debug(">> trajectory invariant constraints: %s", len(self.__invariant_constraints)) LOGGER.debug(str(self.__invariant_constraints)) # trajectory variant LOGGER.debug(">> trajectory variant constraints: %s", len(self.__variant_constraints)) LOGGER.debug(str(self.__variant_constraints)) # dynamics LOGGER.debug(">> dynamic constraints: %s", len(self.__dynamic_constraints)) LOGGER.debug(str(self.__dynamic_constraints)) # initial LOGGER.debug(">> initial constraints: %s", len(self.__initial_constraints)) LOGGER.debug(str(self.__initial_constraints)) def __constraints_satisfied(self): """Ask the SAT solver if the query/system is satisfiable in the given number of steps. .. warning:: vvars (frontier values) are not tested. Thus the satisfiability is not tested for interesting variables like in squery_solve(). :return: True if the problem is satisfiable, False otherwise. :rtype: """ solver = CryptoMS() # Load all clauses into the solver self.__load_solver(solver) if self.__stats: # If stats are activated (never besides in test environment): # sync local data (nb vars, nb clauses with solver state) if solver.nb_vars() > self.__nb_vars: self.__nb_vars = solver.nb_vars() # Starting from pycryptosat 5.2, nb_clauses() is a private attribute #if solver.nb_clauses() > self.__nb_clauses: # self.__nb_clauses = solver.nb_clauses() # Is the problem satisfiable ? return solver.is_satisfiable() def __msolve_constraints(self, max_sol, vvars): """Get solutions from the solver :param max_sol: The maximum number of solutions to be returned :param vvars: Variables for which solutions must differ. In practice, it is a list with the values (Dimacs code) of all frontier places of the system. :type max_sol: :type vvars: > :return: A tuple of RawSolution objects RawSolution objects contain a solution got from SAT solver with all variable parameters from the unfolder :rtype: > """ solver = CryptoMS() # Load all clauses into the solver self.__load_solver(solver) if self.__stats: # If stats are activated (never besides in test environment): # sync local data (nb vars, nb clauses with solver state) if solver.nb_vars() > self.__nb_vars: self.__nb_vars = solver.nb_vars() # Starting from pycryptosat 5.2, nb_clauses() is a private attribute #if solver.nb_clauses() > self.__nb_clauses: # self.__nb_clauses = solver.nb_clauses() if LOGGER.getEffectiveLevel() == DEBUG: LOGGER.debug("__msolve_constraints :: vvars : %s", vvars) LOGGER.debug("__msolve_constraints :: max_sol : %s", max_sol) # Get List of solutions (list of tuples of literals) lintsol = solver.msolve_selected(max_sol, vvars) LOGGER.debug("__msolve_constraints :: lintsol : %s", lintsol) # Make a RawSolution for each solution (tuple of literals) return [RawSolution(solint, self) for solint in lintsol] return tuple(RawSolution(solint, self) for solint in solver.msolve_selected(max_sol, vvars)) def squery_is_satisfied(self, max_step): """Ask the SAT solver if the query/system is satisfiable in the given number of steps. If __invariant_property is set and __final_property is not set, 1 satisfiability test is made only after all shift() calls for each remaining step from the current step. Otherwise, 1 test is made for each remaining step after each shift() call. .. warning:: vvars (frontier values) are not tested. Thus the satisfiability is not tested for interesting variables like in squery_solve(). :param max_step: The horizon on which the properties must be satisfied. :type max_step: """ # Initialization self.init_forward_unfolding() # Horizon adjustment if self.__list_variant_constraints: max_step = min(max_step, len(self.__list_variant_constraints)-1) ## TODO: -1 ? if self.__invariant_property and not self.__final_property: # 1 satisfiability test is made only after all shift() calls for each step # Shift all remaining steps while self.__current_step <= max_step: self.shift() return self.__constraints_satisfied() else: # 1 test is made for each remaining step after each shift() # Check and correct for incoherent data on step number # __steps_before_check can't be >= max_step, otherwise # __current_step could be > max_step due to shift() of "auto shift loop" # and this function will not search any solution, # and will return an empty list. if self.__steps_before_check >= max_step: self.__steps_before_check = max_step - 1 # Auto shift loop: Shift without checking (optimization) # Do not search solutions before the specified limit while self.__current_step < self.__steps_before_check: print("SATIS TEST AUTO SHIFT opti") self.shift() satisfiability_test = False while self.__current_step < max_step: # Shift the system self.shift() # Test satisfiability satisfiability_test = self.__constraints_satisfied() print("SATIS TEST done, current_step", self.__current_step) if satisfiability_test: return True # Return the satisfiability status in all cases return satisfiability_test def squery_solve(self, vvars, max_step, max_sol): """Search max_sol number of solutions with max_step steps, involving vvars (frontiers) variables of interest. Assert a query is loaded (start_condition, inv_condition, final_condition) If __invariant_property is set and __final_property is not set, 1 unique solution search is made only after all shift() calls for each remaining step from the current step. Otherwise, 1 test is made for each remaining step after each shift() call. :param max_step: bounded horizon for computations :param max_sol: The maximum number of solutions to be returned :param vvars: Variables for which solutions must differ. In practice, it is a list with the values (Dimacs code) of all frontier places of the system. :type max_step: :type max_sol: :type vvars: > :return: RawSolution objects RawSolution objects contain a solution got from SAT solver with all variable parameters from the unfolder :rtype: > """ if LOGGER.getEffectiveLevel() == DEBUG: LOGGER.debug("squery_solve :: vvars : %s", vvars) # Initialization self.init_forward_unfolding() # Horizon adjustment if self.__list_variant_constraints: max_step = min(max_step, len(self.__list_variant_constraints)-1) ## TODO: -1 ? if self.__invariant_property and not self.__final_property: # Shift all remaining steps while self.__current_step <= max_step: self.shift() # Solve the problem return self.__msolve_constraints(max_sol, vvars) else: # Check and correct for incoherent data on step number # __steps_before_check can't be >= max_step, otherwise # __current_step could be > max_step due to shift() of "auto shift loop" # and this function will not search any solution, # and will return an empty list. if self.__steps_before_check >= max_step: self.__steps_before_check = max_step - 1 # Auto shift loop: Shift without checking (optimization) # Do not search solutions before the specified limit print("squery_solve: begining; current_step:", self.__current_step, "max_step:", max_step) while self.__current_step < self.__steps_before_check: print("AUTO SHIFT opti") self.shift() # Search for solutions for each remaining step # from __current_step = __steps_before_check # (__current_step is incremented by the last shift() above)... # ... to max_step excluded. # Ex: __steps_before_check = 3, __current_step = 3, max_step = 4 # => we search solutions only for 1 step (step 4) raw_solutions = list() while self.__current_step < max_step: self.shift() temp_raw_solutions = self.__msolve_constraints(max_sol, vvars) print("Nb raw_solutions:", len(temp_raw_solutions), "steps_before_check", self.__steps_before_check, "max_step", max_step, "current_step", self.__current_step ) #raw_input("pause") # When __steps_before_check is 0 by default, or less than # (max_step - 1), we need to search solutions for every steps # until max_step is reached. # Otherwise, __steps_before_check is set to (maxstep - 1) only # to allow us to search solutions for the last step (max_step). if temp_raw_solutions: raw_solutions += temp_raw_solutions return raw_solutions ############################################################################### class CLPropertyVisitor(object): """ SigExpression on states and events into a set of clauses Type checking must be done. """ def __init__(self, aux_cpt=0): self.cpt = aux_cpt # for auxiliary variable naming self.clauses = [] # generated clauses self.top = True # root of the formula? def visit_sig_ident(self, tnode): """ ident -> literal """ name = tnode.name new_lit = Literal(name, True) if self.top: self.clauses.append(Clause([new_lit])) return None return new_lit def visit_sig_not(self, node): """ translate a not """ top = self.top self.top = False # compile operand to nl <=> formula newl = node.operand.accept(self) notnl = newl.lit_not() if top: # we are at the root self.clauses.append(Clause([notnl])) return None return notnl # if we are not at the root def visit_sig_sync(self, binnode): """ translate a binary (or/and) expression """ top = self.top self.top = False # the internal nodes will be affected with the name '_lit' name = '_lit'+str(self.cpt) self.cpt += 1 newl = Literal(name, True) notnl = Literal(name, False) # recursive visits operator = binnode.operator nl1 = binnode.left_h.accept(self) notnl1 = nl1.lit_not() nl2 = binnode.right_h.accept(self) notnl2 = nl2.lit_not() # clauses generation if operator == 'and': # x = lh and rh self.clauses.extend([ Clause([nl1, notnl]), # not x or not lh Clause([nl2, notnl]), # not x or not rh Clause([notnl1, notnl2, newl]) # x or not lh or not rh ]) if operator == 'or': # x = lh or rh self.clauses.extend([ Clause([notnl1, newl]), # x or not lh Clause([notnl2, newl]), # x or not rh Clause([nl1, nl2, notnl]) # not x or not lh or not rh ]) if top: self.clauses.append(Clause([newl])) return None return newl def visit_sig_const(self, exp): """ translate a constant expression """ top = self.top if top: if exp.is_const_false(): raise TypeError("No constant False property allowed") else: # always present or satisfied return None # the internal nodes will be affected with the name '_lit' name = '_lit'+str(self.cpt) self.cpt += 1 newl = Literal(name, True) # clause generation if exp.value: lit = newl # x = True else: lit = newl.lit_not() self.clauses.append(Clause([lit])) return newl # time operators def visit_sig_default(self, exp): """ default translation """ top = self.top self.top = False # the internal nodes will be affected the name '_lit' name = '_lit'+str(self.cpt) self.cpt += 1 newl = Literal(name, True) notnl = Literal(name, False) # recursive visits nl1 = exp.left_h.accept(self) notnl1 = nl1.lit_not() nl2 = exp.right_h.accept(self) notnl2 = nl2.lit_not() # clause generation: x = lh or rh cl1 = Clause([notnl1, newl]) # x or not lh cl2 = Clause([notnl2, newl]) # x or not rh cl3 = Clause([nl1, nl2, notnl]) # not x or not lh or not rh self.clauses.append(cl1) self.clauses.append(cl2) self.clauses.append(cl3) if top: self.clauses.append(Clause([newl])) return None return newl def visit_sig_when(self, exp): """ when translation """ top = self.top self.top = False # the internal nodes will be affected the name '_lit' name = '_lit'+str(self.cpt) self.cpt += 1 newl = Literal(name, True) notnl = Literal(name, False) # recursive visits nl1 = exp.left_h.accept(self) notnl1 = nl1.lit_not() nl2 = exp.right_h.accept(self) notnl2 = nl2.lit_not() # clause generation: x = lh and rh cl1 = Clause([nl1, notnl]) # not x or not lh cl2 = Clause([nl2, notnl]) # not x or not rh cl3 = Clause([notnl1, notnl2, newl]) # x or not lh or not rh self.clauses.append(cl1) self.clauses.append(cl2) self.clauses.append(cl3) if top: self.clauses.append(Clause([newl])) return None return newl