Commit 9e8e8f40 authored by Lucas Terriel's avatar Lucas Terriel 🐍

add protypes to process XML EAD with entity-fishing

parent 08df770d
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.8 (ner4archives)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/ner4archives.iml" filepath="$PROJECT_DIR$/.idea/ner4archives.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
<mapping directory="$PROJECT_DIR$/InriaAlmanach/software_sources/hedgehog" vcs="Git" />
<mapping directory="$PROJECT_DIR$/InriaAlmanach/software_sources/prototypes" vcs="Git" />
</component>
</project>
\ No newline at end of file
# coding: utf-8
# import sys
from nerd.nerd_client import NerdClient
import requests
from ParserClient import ParserClient
class HistoryFishing:
nerdClient = NerdClient()
parserClient = ParserClient()
classesNERD = ["LOCATION", "PERSON", "TITLE", "ACRONYM", "ORGANISATION", "INSTITUTION", "PERSON_TYPE"]
domainsNERD = ["History"]
domains = []
def fetchPreferredTerm(self, entity, lang):
preferredTerm = ""
if 'wikipediaExternalRef' in entity:
concept, conceptStatus = self.nerdClient.get_concept(str(entity['wikipediaExternalRef']), lang)
if conceptStatus == 200:
if 'preferredTerm' in concept:
preferredTerm = concept['preferredTerm']
elif 'wikidataId' in entity:
concept, conceptStatus = self.nerdClient.get_concept(str(entity['wikidataId']), lang)
if conceptStatus == 200:
for langs in concept['multilingual']:
if langs["lang"] == lang:
preferredTerm = langs["term"]
if preferredTerm is None:
preferredTerm = concept['preferredTerm']
return preferredTerm
def fetchPredictedClass(self, entity, lang):
predictedClass = ""
urlBase = 'http://nerd.huma-num.fr/kid/service/ner?id='
if 'wikidataId' in entity:
request= urlBase+entity['wikidataId']
r=requests.get(request)
if r.status_code == 200:
if 'predictedClass' in r.json():
predictedClass = r.json()['predictedClass']
return predictedClass
else:
return ""
elif 'wikipediaExternalRef' in entity:
request = urlBase + entity['wikipediaExternalRef']
r = requests.get(request)
if r.status_code == 200:
if 'predictedClass' in r.json():
predictedClass = r.json()['predictedClass']
return predictedClass
else:
return ""
def getField(self, entity, field):
name = ""
if field in entity:
name = entity[field]
return name
def collectEntities(self, entities, listEntitiesInSentence):
result = []
for i in listEntitiesInSentence:
result.append(entities[i])
return result
def extractPOS(self, text, lang, entities):
# Parsing the sentence containing the entity -> result in CONLL
parserResponse = self.parserClient.process(text, lang)
# Parsing the CONLL result
reader = CoNLLReader()
sentences = reader.read_conll_u(parserResponse.split("\n"))
results = {}
# Get the head and the dependants of the entities
for s in sentences:
for e in entities:
for nodeId in s.nodes():
# print(s.node[nodeId]['form'])
if s.node[nodeId]['form'] == e['rawName'] \
or e['rawName'].startswith(s.node[nodeId]['form']):
# We've found the node corresponding to the entity
dependents = []
head = {}
for h, d in s.edges():
# Head
if d == nodeId:
tmpHead = s[h][d]
head['form'] = str(s.node[h]['form'])
start, end = self.getRange(s.node[h])
head['offsetStart'] = start
head['offsetEnd'] = end
head['relation'] = tmpHead['deprel']
continue
# Dependants
elif h == nodeId:
tmpDep = s[h][d]
dep = {'form': str(s.node[d]['form'])}
start, end = self.getRange(s.node[d])
dep['offsetStart'] = start
dep['offsetEnd'] = end
dep['relation'] = tmpDep['deprel']
dependents.append(dep)
results[e['id']] = (head, dependents)
return results
def getRange(self, nodeId):
substrToken = "TokenRange="
if 'misc' in nodeId and 'TokenRange' in nodeId['misc']:
tRange = nodeId['misc'].split(substrToken)
split = str(tRange[1]).split(":")
return split[0], split[1]
return 0, 0
def process(self, text):
print("Processing " + text)
nerdResponse, statusCode = self.nerdClient.processText(text)
if statusCode != 200:
print("error " + str(statusCode) + ": " + str(nerdResponse))
sys.exit()
lang = 'en'
if 'language' in nerdResponse:
lang = nerdResponse['language']['lang']
print("Language: " + lang)
namedEntities = []
# Working on the entities
if 'entities' in nerdResponse:
entityList = nerdResponse['entities']
print('Found %d entities' % len(entityList))
i = 0
# Collect all the entities
for entity in entityList:
preferredTerm = self.fetchPreferredTerm(entity, lang)
rawName = self.getField(entity, 'rawName')
offsetS = entity['offsetStart']
offsetE = entity['offsetEnd']
namedEntity = {
"id": i,
"rawName": rawName,
"preferredName": preferredTerm,
"wikipediaExternalRef": self.getField(entity, 'wikipediaExternalRef'),
"offsetStart": offsetS,
"offsetEnd": offsetE
}
# If NER type present, I add it
if 'type' in entity:
namedEntity['type'] = entity['type']
# if namedEntity['type'] in self.classesNERD:
namedEntities.append(namedEntity)
i = i + 1
# Matching with tbx dictionary
# sourceDictionary = "resources/WW2_glossary.xml"
# tbxEntities = tbx.matchEntities(text, sourceDictionary, i)
# namedEntities.append(tbxEntities)
# Working on the sentences
sentences = nerdResponse['sentences']
sentenceGroup = {}
# Find the group of entities the current sentence contains
for i in range(0, len(sentences)):
startSentence = int(sentences[i]['offsetStart'])
endSentence = int(sentences[i]['offsetEnd'])
r = range(startSentence, endSentence)
for entity in namedEntities:
entityIndex = namedEntities.index(entity)
if entity['offsetStart'] in r:
if i in sentenceGroup:
sentenceGroup[i].append(entityIndex)
else:
sentenceGroup[i] = [entityIndex]
for sentenceIndex in sentenceGroup.keys():
if len(sentenceGroup[sentenceIndex]) > 0:
entitiesInSentence = self.collectEntities(namedEntities, sentenceGroup[sentenceIndex])
offsetStart = sentences[sentenceIndex]['offsetStart']
offsetEnd = sentences[sentenceIndex]['offsetEnd']
result = self.extractPOS(text[offsetStart:offsetEnd], lang, entitiesInSentence)
for entityIndex in result.keys():
head, dependencies = result[entityIndex]
head['offsetStart'] = int(head['offsetStart']) + int(sentences[sentenceIndex]['offsetStart'])
head['offsetEnd'] = int(head['offsetEnd']) + int(sentences[sentenceIndex]['offsetStart'])
namedEntities[entityIndex]['dependencies'] = dependencies
namedEntities[entityIndex]['head'] = head
for entity in namedEntities:
print("-> " + str(entity['rawName']))
if 'head' in entity:
print("\t\thead: " + str(entity['head']['form']) + " ==> " + str(entity['head']['relation']))
if 'dependencies' in entity:
for dep in entity['dependencies']:
print("\t\tdependency: " + str(dep['form']) + " ==> " + str(dep['relation']))
return namedEntities
import requests
class ParserClient:
clientLocation = "https://lindat.mff.cuni.cz/services/udpipe/api/process"
models = {
"en": "english",
"es": "spanish-ancora",
"fr": "french",
"de": "german"
}
def process(self, text, lang):
lang = self.models[lang]
if not lang and not self.models[lang]:
lang = self.models["en"]
data = {'data': text,
'tokenizer': 'normalized_spaces;ranges',
'model': lang,
'parser': 'true',
'tagger': 'true'
}
r = requests.post(self.clientLocation, data)
statusCode = r.status_code
message = r.reason
if statusCode == 200:
response = r.json()['result']
else:
response = str(statusCode) + ": " + message
return response
This diff is collapsed.
This diff is collapsed.
#!/usr/bin/env python
"""
DESCRIPTION
===========
Script pour baliser les entités nommées à partir du service Entity-fishing
directment dans les fichiers XML EAD
INFO
====
Adaptation du script parser.py du dépôt hedgehog https://github.com/lfoppiano/hedgehog/blob/master/edgehog/nationalArchives/parser.py
OVERVIEW
========
input : XML EAD
output : XML EAD enrichi par le balisage des entités nommées (output.xml)
Pour visualiser un exemple de sortie voir output.xml
# Author : Lucas Terriel / Inria Almanach
# Last release : 07/01/2020
# Status : in progress
"""
import sys
from bs4 import BeautifulSoup
from nerd.nerd_client import NerdClient
from HistoryFishing import HistoryFishing
# Call Entity fishing Python client (pass localhost in api_Base parameter for work with local instance)
# TODO(@Lucas) : localhost ne fonctionne pas
client = NerdClient()
hf = HistoryFishing()
# input XML EAD
input = sys.argv[1]
mapping = {
'geogname': ['LOCATION'],
'persname': ['PERSON'],
'corpname': ['BUSINESS', 'INSTITUTION', 'ORGANISATION']
}
inverseMapping = {}
for k, v in mapping.items():
for x in v:
inverseMapping.setdefault(x, []).append(k)
def load(file):
# load XML files with Beautiful Soup
with open(file) as ead:
soup = BeautifulSoup(ead, "xml")
return soup
# if workking with a directory as input
# files = (listXMLfiles(input))
# for file in files:
soup = load(input)
dids = soup.find_all('did')
listEntities = []
header = ['rawName', 'class', 'wikidataId', 'preferredTerm']
for did in dids:
unittitles = did.find_all("unittitle")
if len(unittitles) > 0:
didTitles = ""
for unittitle in unittitles:
didTitles = didTitles + unittitle.get_text(strip=True) + " "
didTitles = didTitles.replace('\xc2\xa0', ' ').replace('\xa0', ' ')
try:
content, response = client.disambiguate_text(didTitles)
if response == 200:
first_time = True
controlAccess = None
for entity in content['entities']:
if first_time:
controlAccess = soup.new_tag("controlaccess")
first_time = False
out = {
'rawName': entity["rawName"]
}
if "type" in entity:
out['class'] = entity["type"]
if 'wikidataId' in entity:
wikidataId = entity["wikidataId"]
preferredTerm = hf.fetchPreferredTerm(entity=entity, lang="fr")
predictedClass = hf.fetchPredictedClass(entity=entity, lang="fr")
out['predictedClass'] = predictedClass
out['wikidataId'] = wikidataId
out['preferredTerm'] = preferredTerm
# listEntities.append({'rawName': entity["rawName"], 'class': entity["type"], 'wikidataId': wikidataId, 'preferredTerm': preferredTerm, 'predictedClass': predictedClass});
parent = did.parent
parent.insert(len(parent.contents), controlAccess)
if 'predictedClass' in out:
tag = inverseMapping.get(out['predictedClass'])
elif 'class' in out:
tag = inverseMapping.get(out['class'])
else:
tag = ['subject']
if tag is None:
tag = ['subject']
attrs = {}
if 'wikidataId' in out and len(out['wikidataId']) > 0:
attrs = {'authfilenumber': out['wikidataId'], 'source':'wikidata'}
entityTag = soup.new_tag(name=tag[0], attrs=attrs)
if 'preferredTerm' in out:
entityTag.string = out['preferredTerm']
else:
entityTag.string = out['rawName']
controlAccess.append(entityTag)
except:
continue
### Writing output
## Preprocessed text
with open("output" + ".xml", 'w') as rawOutput:
rawOutput.write(str(soup))
#!/usr/bin/env python
"""
DESCRIPTION
===========
Script pour vérifier l'alignement des entités nommées repérées dans les XML EAD
avec les ID WIKI et les types d'entités nommées dans un fichier csv
INFO
====
Adaptation du script parser.py du dépôt hedgehog https://github.com/lfoppiano/hedgehog/blob/master/edgehog/nationalArchives/parser.py
OVERVIEW
========
input : fichier texte brut avec le contenu des XML EAD
output : CSV (out.csv)
Pour visualiser un exemple de sortie voir out.csv
# Author : Lucas Terriel / Inria Almanach
# Last release : 07/01/2020
# Status : in progress
"""
import sys
import pandas as pd
from nerd.nerd_client import NerdClient
from HistoryFishing import HistoryFishing
# Call Entity fishing Python client (pass localhost in api_Base parameter for work with local instance)
# TODO(@Lucas) : localhost ne fonctionne pas
client = NerdClient()
hf = HistoryFishing()
# Input rawText as a representation of XML EAD
input = sys.argv[1]
# Create empty data lists to csv
names = []
type_entities = []
wiki_ids = []
wiki_ext_refs = []
with open(input, 'r', encoding='utf-8') as text:
for paragraph in text:
try:
# Call Entity-fishing API disambiguate service
content, response = client.disambiguate_text(paragraph)
if response == 200:
for entity in content['entities']:
names.append(entity['rawName'])
type_entities.append(entity['type'])
wiki_ids.append(entity['wikidataId'])
wiki_ext_refs.append(entity['wikipediaExternalRef'])
except:
print("### ERROR ###")
continue
# write a csv
s1 = pd.Series(names, name='names')
s2 = pd.Series(type_entities, name='type_entities')
s3 = pd.Series(wiki_ids,name='wiki_ids')
s4 = pd.Series(wiki_ext_refs, name='wiki_ext_ref')
df = pd.concat([s1, s2, s3, s4], axis=1)
#df = pd.DataFrame({'RAW NAME ENTITY': names,
#'TYPE ENTITY': type_entities,
#'WIKIDATA ID': wiki_ids,
#'WIKIPEDIA EXTERNAL REF': wiki_ext_refs,
#})
df.to_csv('out.csv', index=False)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment