Mentions légales du service

Skip to content
Snippets Groups Projects
ogylinker.py 11.38 KiB
#!/usr/bin/python3
# -*- coding: utf-8 -*-

from lib import *
from lib.utilities import *
from operator import attrgetter
import itertools as it


class AbstractOntologyLinker:

    def __init__(self, ontology):
        self._ogy = ontology
        self._linkkeys = Dict()
        self._handled_classes = None
        self._quantifiers = ('', '', '∀∃')

    @property
    def ogy(self):
        return self._ogy

    @property
    def linkkeys(self):
        return self._linkkeys

    @property
    def quantifiers(self):
        return self._quantifiers

    @quantifiers.setter
    def quantifiers(self, value):
        self._quantifiers = value

    def check_sets_equality(self, set_0, set_1, quantifier):
        if quantifier in {'', 'E'}:
            return len(set_0 & set_1) > 0
        elif quantifier in {'', 'F'}:
            return set_0 == set_1
        elif quantifier in {'∀∃', 'FE'}:
            return set_0 == set_1 and len(set_0) > 0
        else:
            print('Unknown quantifier %s' % quantifier)
            return False

    def check_val_equality(self, inst_0, inst_1, qprop_01):
        vals_0 = inst_0.values_for_property(qprop_01[1])
        vals_1 = inst_1.values_for_property(qprop_01[2])
        return self.check_sets_equality(vals_0, vals_1, qprop_01[0])

    def check_obj_equality(self, inst_0, inst_1, qprop_01):
        if self.ogy.has_same_as(inst_0, inst_1):
            return True

        c0_id = inst_0.class_for_property(qprop_01[1])
        c1_id = inst_1.class_for_property(qprop_01[2])
        linkkey = self._linkkeys.get((c0_id, c1_id))
        if linkkey is None:
            return False

        inst_ids_0 = inst_0.objects_for_property(qprop_01[1])
        inst_ids_1 = inst_1.objects_for_property(qprop_01[2])
        inter_0, inter_1 = set(), set()
        for inst_id_01 in it.product(inst_ids_0, inst_ids_1):
            equality = True
            for q01_key in linkkey.keys:
                equality &= self.check_relation(inst_id_01, q01_key)
            if equality:
                inter_0.add(inst_id_01[0])
                inter_1.add(inst_id_01[1])

        return self.check_sets_equality(inter_0, inst_ids_0, qprop_01[0]) and \
               self.check_sets_equality(inter_1, inst_ids_1, qprop_01[0])

    def check_relation(self, inst_ids_01, qprop_01):
        inst_0 = self._ogy.get_instance(inst_ids_01[0])
        inst_1 = self._ogy.get_instance(inst_ids_01[1])
        class_0 = self._ogy.get_class(inst_0.class_id)
        class_1 = self._ogy.get_class(inst_1.class_id)

        if qprop_01[1] in class_0.val_property_ids and\
           qprop_01[2] in class_1.val_property_ids:
            result = self.check_val_equality(inst_0, inst_1, qprop_01)
        elif qprop_01[1] in class_0.obj_property_ids and\
             qprop_01[2] in class_1.obj_property_ids:
            result = self.check_obj_equality(inst_0, inst_1, qprop_01)
        else:
            result = False
        return result

    def measure_linkkey(self, c0_id, c1_id, concept):
        insts_0 = self._ogy.get_instances(c0_id)
        insts_1 = self._ogy.get_instances(c1_id)
        intent = concept.intent
        if concept.alt_intent is not None:
            intent = concept.alt_intent
        linkkey = OntologyLinkkey(c0_id, c1_id, intent, concept.names)

        if len(concept.extent) != 0:
            nb_insts_0, nb_insts_1 = len(insts_0), len(insts_1)
            nb_insts = nb_insts_0 + nb_insts_1
            nb_links0 = len(set([val[0] for val in concept.extent]))
            nb_links1 = len(set([val[1] for val in concept.extent]))

            linkkey.discrimi = min(nb_links0, nb_links1) / len(concept.extent)
            linkkey.coverage = (nb_links0 + nb_links1) / nb_insts
            linkkey.fmeasure = 2.0 * (linkkey.discrimi * linkkey.coverage)
            linkkey.fmeasure /= (linkkey.discrimi + linkkey.coverage)
        concept.fmeasure = linkkey.fmeasure
        return linkkey

    def add_linkkey(self, linkkey):
        key = (linkkey.class_0, linkkey.class_1)
        self._linkkeys.set(key, linkkey)

    def remove_linkkey(self, linkkey):
        key = (linkkey.class_0, linkkey.class_1)
        self._linkkeys.pop(key)


class FCAOntologyLinker(AbstractOntologyLinker):

    def __init__(self, ontology, alignments):
        AbstractOntologyLinker.__init__(self, ontology)
        self.alignments = alignments

    def next_alignment(self):
        for (c0_id, c1_id) in self.alignments:
            if c0_id in self._handled_classes and \
               c1_id in self._handled_classes:
                continue
            dependencies = self._ogy.get_dependencies(c0_id, c1_id)
            dependencies -= self._handled_classes
            if len(dependencies) == 0:
                return c0_id, c1_id
        return None

    def find_linkkey(self, c0_id, c1_id):
        class0, class1 = self._ogy.get_class(c0_id), self._ogy.get_class(c1_id)
        vprops_ids_0 = class0.val_property_ids
        vprops_ids_1 = class1.val_property_ids
        qvprop_01_set = it.product(self.quantifiers, vprops_ids_0, vprops_ids_1)
        oprops_ids_0 = class0.obj_property_ids
        oprops_ids_1 = class1.obj_property_ids
        qoprop_01_set = it.product(self.quantifiers, oprops_ids_0, oprops_ids_1)
        qprop_01_set = list(qvprop_01_set) + list(qoprop_01_set)
        inst_ids_01 = it.product(class0.instance_ids, class1.instance_ids)
        inst_ids_01 = list(inst_ids_01)

        identifier = "K_%s_%s" % (c0_id, c1_id)
        context = FormalContext(identifier, inst_ids_01, qprop_01_set)
        context.clean_lattice = True
        for inst_id_01, qprop_01 in it.product(inst_ids_01, qprop_01_set):
            if self.check_relation(inst_id_01, qprop_01):
                context.add_relation(inst_id_01, qprop_01)
        lattice = context.build_lattice()

        linkkeys = [self.measure_linkkey(c0_id, c1_id, c) for c in lattice]
        return context, lattice, max(linkkeys, key=attrgetter('fmeasure'))

    def extract_linkkeys(self, exporter=None):
        self._quantifiers = guard_operators(self._quantifiers)
        self._handled_classes = set()
        for linkkey in self.linkkeys.values():
            self._handled_classes |= linkkey.classes

        alignment = self.next_alignment()
        while alignment is not None:
            c1_id, c2_id = alignment
            context, lattice, linkkey = self.find_linkkey(c1_id, c2_id)
            self._linkkeys.set(tuple(alignment), linkkey)
            self._handled_classes |= set(alignment)
            alignment = self.next_alignment()
            if exporter is not None:
                exporter.export_fca_context(context)
                exporter.export_lattice(lattice)
        return self.linkkeys


class RCAOntologyLinker(AbstractOntologyLinker):

    def __init__(self, ontology):
        AbstractOntologyLinker.__init__(self, ontology)
        self.fc_names = DDict()
        self.fcontexts, self.rcontexts = {}, {}

    def build_formal_contexts(self):
        c0_ids, c1_ids = self.ogy.classes0, self.ogy.classes1
        for (c0_id, c1_id) in it.product(c0_ids, c1_ids):
            c0, c1 = self.ogy.get_class(c0_id), self.ogy.get_class(c1_id)
            vprops_0, vprops_1 = c0.val_property_ids, c1.val_property_ids
            vprops_q01 = list(it.product(self.quantifiers, vprops_0, vprops_1))
            insts_01 = list(it.product(c0.instance_ids, c1.instance_ids))

            fc_name = "K_%s_%s" % (c0.class_id, c1.class_id)
            self.fc_names.set((c0.class_id, c1.class_id), fc_name)
            context = FormalContext(fc_name, insts_01, vprops_q01)
            for inst01, vprop_q01 in it.product(insts_01, vprops_q01):
                if self.check_relation(inst01, vprop_q01):
                    context.add_relation(inst01, vprop_q01)

            if not context.is_empty:
                self.fcontexts[fc_name] = context

    def build_relational_contexts(self):
        classes_ids_prod = it.product(self.fc_names.keys(), repeat=2)
        for ((c0_id, c1_id), (c2_id, c3_id)) in classes_ids_prod:
            c0, c1 = self.ogy.get_class(c0_id), self.ogy.get_class(c1_id)
            if c2_id not in c0.dependencies or c3_id not in c1.dependencies:
                continue

            oprops_0 = self.ogy.linked_classes.get((c0_id, c2_id))
            oprops_1 = self.ogy.linked_classes.get((c1_id, c3_id))
            for attr01 in it.product(oprops_0, oprops_1):
                fc0_name = self.fc_names.get_value((c0_id, c1_id))
                fc1_name = self.fc_names.get_value((c2_id, c3_id))
                fc0 = self.fcontexts.get(fc0_name, None)
                fc1 = self.fcontexts.get(fc1_name, None)
                if fc0 is None or fc1 is None:
                    continue

                rc_name = '_'.join(attr01)
                rcontext = RelationalContext(rc_name, fc0_name, fc1_name)
                rcontext.metadata = str(attr01)

                for (o01, v01) in it.product(fc0.objects, fc1.objects):
                    if self.ogy.has_properties(o01, v01, attr01):
                        rcontext.add_relation(o01, v01)

                if not rcontext.is_empty:
                    self.rcontexts[rc_name] = rcontext

    def search_linkkeys(self, lattices):
        vector_linkkeys, max_i = [], 0
        for combinations in it.product(*lattices.values()):
            ccpt_ids, ccpt_dep = set(), set()
            vector, score = [], 0.0

            for concept in combinations:
                if concept.names is not None:
                    ccpt_ids |= concept.names
                if concept.dependencies is not None:
                    ccpt_dep |= concept.dependencies
                (c0, c1) = self.fc_names.get_key(concept.context_id)
                vector.append(self.measure_linkkey(c0, c1, concept))
                score += vector[-1].fmeasure / len(combinations)

            if ccpt_dep <= ccpt_ids:
                vector_linkkeys.append((vector, score))
                if vector_linkkeys[max_i][1] <= score:
                    max_i = len(vector_linkkeys) - 1
        return max_i, vector_linkkeys

    def extract_linkkeys(self, exporter):
        self._quantifiers = guard_operators(self._quantifiers)
        Logger.log('Ontology linker')
        Logger.log('Number of classes 0: %d' % len(self.ogy.classes0))
        Logger.log('Number of classes 1: %d' % len(self.ogy.classes1))
        Logger.log('Number of instances: %d' % self.ogy.nb_instances)
        Logger.log('Number of d-properties: %d' % self.ogy.nb_val_properties)
        Logger.log('Number of o-properties: %d' % self.ogy.nb_obj_properties)
        if self.ogy.nb_instances == 0 or self.ogy.nb_properties == 0:
            return {}

        Logger.log('Building formal contexts')
        self.build_formal_contexts()
        Logger.log('nb formal contexts:', len(self.fcontexts))
        Logger.log('Building relational contexts')
        self.build_relational_contexts()
        Logger.log('nb formal contexts:', len(self.rcontexts))
        Logger.log('Building RCA process')
        process = RCAProcess(self.fcontexts, self.rcontexts, self.quantifiers)
        process.clean_lattice = True
        Logger.log('Building lattices')
        lattices = process.build_lattices(exporter)
        Logger.log('Searching link keys')
        index, vectors = self.search_linkkeys(lattices)

        if exporter is not None:
            # This will reexport the lattices... with the compatible link keys
            exporter.export_lattices(lattices, index, vectors)
        return index, vectors