#!/usr/bin/python3 # -*- coding: utf-8 -*- from lib import * from lib.utilities import * from operator import attrgetter import itertools as it class AbstractOntologyLinker: def __init__(self, ontology): self._ogy = ontology self._linkkeys = Dict() self._handled_classes = None self._quantifiers = ('∃', '∀', '∀∃') @property def ogy(self): return self._ogy @property def linkkeys(self): return self._linkkeys @property def quantifiers(self): return self._quantifiers @quantifiers.setter def quantifiers(self, value): self._quantifiers = value def check_sets_equality(self, set_0, set_1, quantifier): if quantifier in {'∃', 'E'}: return len(set_0 & set_1) > 0 elif quantifier in {'∀', 'F'}: return set_0 == set_1 elif quantifier in {'∀∃', 'FE'}: return set_0 == set_1 and len(set_0) > 0 else: print('Unknown quantifier %s' % quantifier) return False def check_val_equality(self, inst_0, inst_1, qprop_01): vals_0 = inst_0.values_for_property(qprop_01[1]) vals_1 = inst_1.values_for_property(qprop_01[2]) return self.check_sets_equality(vals_0, vals_1, qprop_01[0]) def check_obj_equality(self, inst_0, inst_1, qprop_01): if self.ogy.has_same_as(inst_0, inst_1): return True c0_id = inst_0.class_for_property(qprop_01[1]) c1_id = inst_1.class_for_property(qprop_01[2]) linkkey = self._linkkeys.get((c0_id, c1_id)) if linkkey is None: return False inst_ids_0 = inst_0.objects_for_property(qprop_01[1]) inst_ids_1 = inst_1.objects_for_property(qprop_01[2]) inter_0, inter_1 = set(), set() for inst_id_01 in it.product(inst_ids_0, inst_ids_1): equality = True for q01_key in linkkey.keys: equality &= self.check_relation(inst_id_01, q01_key) if equality: inter_0.add(inst_id_01[0]) inter_1.add(inst_id_01[1]) return self.check_sets_equality(inter_0, inst_ids_0, qprop_01[0]) and \ self.check_sets_equality(inter_1, inst_ids_1, qprop_01[0]) def check_relation(self, inst_ids_01, qprop_01): inst_0 = self._ogy.get_instance(inst_ids_01[0]) inst_1 = self._ogy.get_instance(inst_ids_01[1]) class_0 = self._ogy.get_class(inst_0.class_id) class_1 = self._ogy.get_class(inst_1.class_id) if qprop_01[1] in class_0.val_property_ids and\ qprop_01[2] in class_1.val_property_ids: result = self.check_val_equality(inst_0, inst_1, qprop_01) elif qprop_01[1] in class_0.obj_property_ids and\ qprop_01[2] in class_1.obj_property_ids: result = self.check_obj_equality(inst_0, inst_1, qprop_01) else: result = False return result def measure_linkkey(self, c0_id, c1_id, concept): insts_0 = self._ogy.get_instances(c0_id) insts_1 = self._ogy.get_instances(c1_id) intent = concept.intent if concept.alt_intent is not None: intent = concept.alt_intent linkkey = OntologyLinkkey(c0_id, c1_id, intent, concept.names) if len(concept.extent) != 0: nb_insts_0, nb_insts_1 = len(insts_0), len(insts_1) nb_insts = nb_insts_0 + nb_insts_1 nb_links0 = len(set([val[0] for val in concept.extent])) nb_links1 = len(set([val[1] for val in concept.extent])) linkkey.discrimi = min(nb_links0, nb_links1) / len(concept.extent) linkkey.coverage = (nb_links0 + nb_links1) / nb_insts linkkey.fmeasure = 2.0 * (linkkey.discrimi * linkkey.coverage) linkkey.fmeasure /= (linkkey.discrimi + linkkey.coverage) concept.fmeasure = linkkey.fmeasure return linkkey def add_linkkey(self, linkkey): key = (linkkey.class_0, linkkey.class_1) self._linkkeys.set(key, linkkey) def remove_linkkey(self, linkkey): key = (linkkey.class_0, linkkey.class_1) self._linkkeys.pop(key) class FCAOntologyLinker(AbstractOntologyLinker): def __init__(self, ontology, alignments): AbstractOntologyLinker.__init__(self, ontology) self.alignments = alignments def next_alignment(self): for (c0_id, c1_id) in self.alignments: if c0_id in self._handled_classes and \ c1_id in self._handled_classes: continue dependencies = self._ogy.get_dependencies(c0_id, c1_id) dependencies -= self._handled_classes if len(dependencies) == 0: return c0_id, c1_id return None def find_linkkey(self, c0_id, c1_id): class0, class1 = self._ogy.get_class(c0_id), self._ogy.get_class(c1_id) vprops_ids_0 = class0.val_property_ids vprops_ids_1 = class1.val_property_ids qvprop_01_set = it.product(self.quantifiers, vprops_ids_0, vprops_ids_1) oprops_ids_0 = class0.obj_property_ids oprops_ids_1 = class1.obj_property_ids qoprop_01_set = it.product(self.quantifiers, oprops_ids_0, oprops_ids_1) qprop_01_set = list(qvprop_01_set) + list(qoprop_01_set) inst_ids_01 = it.product(class0.instance_ids, class1.instance_ids) inst_ids_01 = list(inst_ids_01) identifier = "K_%s_%s" % (c0_id, c1_id) context = FormalContext(identifier, inst_ids_01, qprop_01_set) context.clean_lattice = True for inst_id_01, qprop_01 in it.product(inst_ids_01, qprop_01_set): if self.check_relation(inst_id_01, qprop_01): context.add_relation(inst_id_01, qprop_01) lattice = context.build_lattice() linkkeys = [self.measure_linkkey(c0_id, c1_id, c) for c in lattice] return context, lattice, max(linkkeys, key=attrgetter('fmeasure')) def extract_linkkeys(self, exporter=None): self._quantifiers = guard_operators(self._quantifiers) self._handled_classes = set() for linkkey in self.linkkeys.values(): self._handled_classes |= linkkey.classes alignment = self.next_alignment() while alignment is not None: c1_id, c2_id = alignment context, lattice, linkkey = self.find_linkkey(c1_id, c2_id) self._linkkeys.set(tuple(alignment), linkkey) self._handled_classes |= set(alignment) alignment = self.next_alignment() if exporter is not None: exporter.export_fca_context(context) exporter.export_lattice(lattice) return self.linkkeys class RCAOntologyLinker(AbstractOntologyLinker): def __init__(self, ontology): AbstractOntologyLinker.__init__(self, ontology) self.fc_names = DDict() self.fcontexts, self.rcontexts = {}, {} def build_formal_contexts(self): c0_ids, c1_ids = self.ogy.classes0, self.ogy.classes1 for (c0_id, c1_id) in it.product(c0_ids, c1_ids): c0, c1 = self.ogy.get_class(c0_id), self.ogy.get_class(c1_id) vprops_0, vprops_1 = c0.val_property_ids, c1.val_property_ids vprops_q01 = list(it.product(self.quantifiers, vprops_0, vprops_1)) insts_01 = list(it.product(c0.instance_ids, c1.instance_ids)) fc_name = "K_%s_%s" % (c0.class_id, c1.class_id) self.fc_names.set((c0.class_id, c1.class_id), fc_name) context = FormalContext(fc_name, insts_01, vprops_q01) for inst01, vprop_q01 in it.product(insts_01, vprops_q01): if self.check_relation(inst01, vprop_q01): context.add_relation(inst01, vprop_q01) if not context.is_empty: self.fcontexts[fc_name] = context def build_relational_contexts(self): classes_ids_prod = it.product(self.fc_names.keys(), repeat=2) for ((c0_id, c1_id), (c2_id, c3_id)) in classes_ids_prod: c0, c1 = self.ogy.get_class(c0_id), self.ogy.get_class(c1_id) if c2_id not in c0.dependencies or c3_id not in c1.dependencies: continue oprops_0 = self.ogy.linked_classes.get((c0_id, c2_id)) oprops_1 = self.ogy.linked_classes.get((c1_id, c3_id)) for attr01 in it.product(oprops_0, oprops_1): fc0_name = self.fc_names.get_value((c0_id, c1_id)) fc1_name = self.fc_names.get_value((c2_id, c3_id)) fc0 = self.fcontexts.get(fc0_name, None) fc1 = self.fcontexts.get(fc1_name, None) if fc0 is None or fc1 is None: continue rc_name = '_'.join(attr01) rcontext = RelationalContext(rc_name, fc0_name, fc1_name) rcontext.metadata = str(attr01) for (o01, v01) in it.product(fc0.objects, fc1.objects): if self.ogy.has_properties(o01, v01, attr01): rcontext.add_relation(o01, v01) if not rcontext.is_empty: self.rcontexts[rc_name] = rcontext def search_linkkeys(self, lattices): vector_linkkeys, max_i = [], 0 for combinations in it.product(*lattices.values()): ccpt_ids, ccpt_dep = set(), set() vector, score = [], 0.0 for concept in combinations: if concept.names is not None: ccpt_ids |= concept.names if concept.dependencies is not None: ccpt_dep |= concept.dependencies (c0, c1) = self.fc_names.get_key(concept.context_id) vector.append(self.measure_linkkey(c0, c1, concept)) score += vector[-1].fmeasure / len(combinations) if ccpt_dep <= ccpt_ids: vector_linkkeys.append((vector, score)) if vector_linkkeys[max_i][1] <= score: max_i = len(vector_linkkeys) - 1 return max_i, vector_linkkeys def extract_linkkeys(self, exporter): self._quantifiers = guard_operators(self._quantifiers) Logger.log('Ontology linker') Logger.log('Number of classes 0: %d' % len(self.ogy.classes0)) Logger.log('Number of classes 1: %d' % len(self.ogy.classes1)) Logger.log('Number of instances: %d' % self.ogy.nb_instances) Logger.log('Number of d-properties: %d' % self.ogy.nb_val_properties) Logger.log('Number of o-properties: %d' % self.ogy.nb_obj_properties) if self.ogy.nb_instances == 0 or self.ogy.nb_properties == 0: return {} Logger.log('Building formal contexts') self.build_formal_contexts() Logger.log('nb formal contexts:', len(self.fcontexts)) Logger.log('Building relational contexts') self.build_relational_contexts() Logger.log('nb formal contexts:', len(self.rcontexts)) Logger.log('Building RCA process') process = RCAProcess(self.fcontexts, self.rcontexts, self.quantifiers) process.clean_lattice = True Logger.log('Building lattices') lattices = process.build_lattices(exporter) Logger.log('Searching link keys') index, vectors = self.search_linkkeys(lattices) if exporter is not None: # This will reexport the lattices... with the compatible link keys exporter.export_lattices(lattices, index, vectors) return index, vectors