import re, fnmatch, base64, requests
import logging
class DictionaryEntry(object):
def set_object(self, start_str=None, end_str=None, choose=None):
if start_str is None or end_str is None:
raise AttributeError('You must specify a start and end strings to set an object.')
else:
self.start_string = start_str
self.end_string = end_str
if choose:
if type(choose) == int or re.match('^\d+$', choose):
self.candidate = self._candidates[int(choose)]
elif type(choose) == str:
for ff in self._candidates:
if ff.strip().lower() == choose.strip().lower():
self.candidate = ff.strip()
else:
raise AttributeError('Choose variable is of wrong type.')
self.ambigue = False
self.next_candidate = None
self.prev_candidate = None
else:
self.ambigue = True
self.candidate = self._candidates[0]
return self
def add_candidate(self, term=None):
if not term:
raise AttributeError('You need to give a term to add.')
if term not in self._candidates:
self._candidates.append(term)
self.ambigue = True
def add_to_object(self, objct=None):
if not objct:
raise AttributeError('You need to give an object to add.')
if type(objct) != list:
objct = [objct]
for o in objct:
test = True
while test:
self.add_candidate(o.candidate)
test = o.next_candidate()
def prev_candidate(self):
if self._index - 1 > 0:
self._index -= 1
self.candidate = self._candidates[self._index]
return self.candidate
else:
return False
def next_candidate(self):
if len(self._candidates) > self._index + 1:
self._index += 1
self.candidate = self._candidates[self._index]
return self.candidate
else:
return False
def __init__(self, key=None, value=None, original_term=None, *args, **kwargs):
super(DictionaryEntry).__init__(*args, **kwargs)
if not key or not value:
raise AttributeError('You must specify a key and a value to create an object')
if original_term:
self.original_term = original_term
self.key = key
if type(value) == list:
self._candidates = value
if len(value) > 1:
self.ambigue = True
else:
self.ambigue = False
elif type(value) == str:
if ',' in value:
self._candidates = []
for x in value.strip().split(','):
self._candidates.append(x.strip())
self.ambigue = True
else:
self._candidates = [value.strip()]
self.ambigue = False
self.candidate = self._candidates[0]
self._index = 0
class DictionaryObject(object):
@staticmethod
def _readdictionary(dictionary):
verz_dict = dict()
verz_dict_left = dict()
verz_dict_lower = dict()
counter = 0
with open(dictionary, 'r') as verz:
for row in verz:
counter += 1
f = re.match(r'^([^\t]+?)\t+([^\t]+?)$', row)
if f:
ent_clean = f.group(2).strip().replace(' etc.', '')
if f.group(1).strip().startswith('-'):
verz_dict_left[f.group(1).strip()[1:]] = []
for ff in ent_clean.split(','):
verz_dict_left[f.group(1).strip()[1:]].append(ff.strip()[1:])
else:
verz_dict[f.group(1).strip()] = []
if f.group(1).strip().lower() not in verz_dict_lower.keys():
verz_dict_lower[f.group(1).strip().lower()] = [f.group(1).strip()]
else:
verz_dict_lower[f.group(1).strip().lower()].append(f.group(1).strip())
for ff in ent_clean.split(','):
verz_dict[f.group(1).strip()].append(ff.strip())
else:
logging.warning('no match: {}'.format(counter))
return verz_dict, verz_dict_left, verz_dict_lower
def search_abbrev(self, term=None):
r = False
if not term:
raise AttributeError('You must specify a search term.')
term = term.strip()
if not term.endswith('.'):
term += '.'
if term in self._verz_dict.keys():
r = DictionaryEntry(key=term, value=self._verz_dict[term])
if term.lower() in self._verz_dict_lower.keys():
for x in self._verz_dict_lower[term.lower()]:
if x != term:
if not r:
r = DictionaryEntry(key=term, value=self._verz_dict[x])
else:
rr = DictionaryEntry(key=term, value=self._verz_dict[x])
r.add_to_object(rr)
if r:
return r
if len(term.split()) == 1:
for z in self._verz_dict_left.keys():
if term.endswith(z):
if not r:
r = DictionaryEntry(key=term, value=[term[:-1+len(u)]+u for u in self._verz_dict_left[z]])
r.ambigue = True
if r:
rr = DictionaryEntry(key=term, value=[term[:-1+len(u)]+u for u in self._verz_dict_left[z]])
r.add_to_object(rr)
if r:
return r
else:
return False
def __init__(self, dictionary=None, *args, **kwargs):
if not dictionary:
raise AttributeError('You must specify a dictionary')
self._index = False
self._index_left = False
self._verz_dict, self._verz_dict_left, self._verz_dict_lower = self._readdictionary(dictionary=dictionary)
#print(self._verz_dict_lower)
class QuerySketchEngine(object):
sketch_url = 'http://ske.herkules.arz.oeaw.ac.at/bonito/run.cgi/view'
data = {'corpname': 'amc_2.3', 'format': 'json'}
max_page = None
test_next = True
def __init__(self, query, page_size=25, max_depth=5, user=None, password=None, *args, **kwargs):
super(QuerySketchEngine, self).__init__(*args, **kwargs)
if not user or not password:
raise AttributeError('You need to specify user and password for the sketch engine')
z = '{}:{}'.format(user, password).encode()
base64string = base64.b64encode(z)[:-1]
self._headers = {"Authorization": b"Basic " + base64string, 'accept': 'application/json'}
self.query_url = query
self.page_size = page_size
self.max_depth = max_depth
def query(self, page=1):
self.data['q'] = self.query_url
self.data['fromp'] = str(page)
self.data['pagesize'] = str(self.page_size)
self.page = page
self.req_ = requests.get(self.sketch_url, params=self.data, headers=self._headers)
try:
self.json = self.req_.json()
except:
self.json = False
if self.json:
if 'error' in self.json.keys():
self.json = False
elif not self.max_page:
if 'numofpages' in self.json.keys():
if int(self.json['numofpages']) <= self.max_depth:
self.max_page = int(self.json['numofpages'])
else:
self.max_page = self.max_depth
else:
self.max_page = 1
def next_page(self):
if self.page < self.max_page:
self.query(page=self.page + 1)
else:
self.test_next = False
def previous_page(self):
if self.page > 1:
self.query(page=self.page - 1)
else:
self.test_next = False
def find_plausible(self):
if self.json:
self.candidates = {}
while self.test_next:
for x in self.json['Lines']:
cand = ''
for xx in x['Kwic']:
if xx['class'] != 'attr':
cand += xx['str']
if cand.strip() in self.candidates.keys():
self.candidates[cand.strip()] += 1
else:
self.candidates[cand.strip()] = 1
self.next_page()
cand_new = []
for t in self.candidates.keys():
cand_new.append((t, self.candidates[t]))
self.candidates = sorted(cand_new, key=lambda tup: tup[1], reverse=True)
if len(self.candidates) == 0:
self.candidates = False
else:
self.candidates = False
[docs]class ResolveAbbreviations(object):
"""Used to resolve abbreviations in German Texts. For resolving it uses several resources:
* A list of German words
* A list of common German abbreviations
* A list of abbreviations used in the Austrian Bibliographic Dictionary (ÖBL)
* and an API that allows to query the Austrian Media Corpus (AMC_)
*Example:*
::
abbrev = ResolveAbbreviations(text='test text', person=['Müller', 'Peter'], user='user', password='pw')
resolved_text = abbrev.resolve(always_amc=True)
:param str text: The text that should be processed (unicode string)
:param list person: List of names of the Person (used to create possible abbreviations of the Person names)
:param dictionaries: Dictionary that should be used additionally
:type dictionaries: str or tuple or list
:param str user: User for the sketch engine
:param str password: Password for the sketch engine
.. _AMC: http://www.oeaw.ac.at/acdh/de/amc
"""
pipeline = []
dictionaries_lst = [
(1, 'data/Abkürzungsverzeichnis_APIS_ed_9_1_17.tsv'),
(2, 'data/abkuerzungen_edit.tsv')]
stop_chars = ['(', ')', ';', '!', '?', '"', ',']
def save_text(self, resolve_all=True, path=None):
if not path:
raise AttributeError('You need to give a path.')
if not self._text_resolved_raw:
raise AttributeError('You need to run resolve first.')
with open(path, 'w') as wf:
txt = ''
for x in self._text_resolved_raw:
if type(x) == str:
txt += x
else:
txt += ' '.join([y.candidate for y in x])
wf.write(txt)
def _extend(self, extend=None, final=False):
if not extend:
raise AttributeError('You need to pass an extend object.')
if (extend[1] == 'left' and self.start_str > 0) or (self.end_str == len(self.text) and extend[1] == 'right'):
txt = self.text[:self.start_str].split()
txt_lst = txt[-extend[0]]
extend = (extend[0], 'left')
elif (extend[1] == 'right' and self.end_str < len(self.text)) or (self.start_str == 0 and extend[1] == 'left'):
txt = self.text[self.end_str:].split()
txt_lst = txt[:extend[0]]
extend = (extend[0], 'right')
else:
print('not captured: {}'.format(extend))
if type(txt_lst) == str:
txt_lst = [txt_lst]
txt_lst_fin = []
for z in txt_lst:
if z not in self.stop_chars:
for zz in self.stop_chars:
z.replace(zz, '')
txt_lst_fin.append(z)
if len(txt_lst_fin) < extend[0] and not final:
extend_new = (extend[0] + 1, extend[1])
txt_lst_fin, extend_new = self._extend(extend=extend_new, final=True)
return txt_lst_fin, extend
def query_amc(self, dict_objects=None, extend=False):
if not dict_objects:
raise AttributeError('You need to pass a list of dictionary object.')
if extend:
txt_lst, extend = self._extend(extend=extend)
for idx, tt in enumerate(txt_lst):
d = DictionaryEntry(key=tt, value=tt.strip())
if extend[1] == 'right':
dict_objects.append(d)
elif extend[1] == 'left':
dict_objects.insert(idx, d)
query = 'q'
for o in dict_objects:
print('dict object: {} - extend: {}, key: {}'.format(o.candidate, extend, o.key))
if o.key.endswith('.'):
key_1 = o.key[:-1]+'*'
else:
key_1 = o.key
if len(key_1) > 2:
query += '[word="{}"'.format(key_1)
if o.ambigue:
test = False
query += '|word="{}"'.format(o.candidate)
test = o.next_candidate()
while test:
query += '|word="{}"'.format(o.candidate)
test = o.next_candidate()
query += ']'
else:
query += '|word="{}"]'.format(o.candidate)
if query[1] == '|':
query = 'q['+query[2:]
print(query)
res = QuerySketchEngine(query, user=self.sketch_user, password=self.sketch_password)
res.query()
res.find_plausible()
if res.candidates:
cands_lst = res.candidates[0][0].split()
if extend:
if extend[1] == 'right':
cands_lst = cands_lst[:-extend[0]]
dict_objects = dict_objects[:-extend[0]]
elif extend[1] == 'left':
cands_lst = cands_lst[extend[0]:]
dict_objects = dict_objects[extend[0]:]
for idx, cand in enumerate(cands_lst):
if cand.strip() in dict_objects[idx]._candidates:
dict_objects[idx].set_object(choose=cand.strip(), start_str=self.start_str, end_str=self.end_str)
else:
dict_objects[idx].add_candidate(term=cand.strip())
dict_objects[idx].set_object(choose=cand.strip(), start_str=self.start_str, end_str=self.end_str)
print('candidates')
return dict_objects
else:
if extend:
if extend[1] == 'right':
dict_objects = dict_objects[:-extend[0]]
elif extend[1] == 'left':
dict_objects = dict_objects[extend[0]:]
return dict_objects
[docs] def resolve(self, always_amc=False):
"""
Resolve function. Resolves the abbreviation in the text given in the __init__ function.
:param always_amc: *(Boolean)* Specifies whether to also use the AMC for unambiguous abbreviations. Defaults to False.
:return: Resolved text.
:rtype: string
"""
self._text_resolved_raw = []
self.text_resolved = ''
idx = 0
for abb in self.abbrev:
start_str = abb.span()[0]
end_str = abb.span()[1]
self.start_str = start_str
self.end_str = end_str
self._text_resolved_raw.append(self.text[idx:start_str])
zw_list = abb.group(1).strip().split()
if len(zw_list) > 1:
zw_list.insert(0, abb.group(1).strip())
zw_res = []
zw_ambigue = False
print(zw_list)
for indx2, a in enumerate(zw_list):
r = False
if len(a.strip()) > 2:
for d in self._dictionaries:
if not r:
r = d.search_abbrev(term=a.strip())
#print('R: {}'.format(r))
#if r:
else:
rr = d.search_abbrev(term=a.strip())
if rr:
r.add_to_object(rr)
if r:
test3 = True
zw_list_1 = []
while test3:
form_lst = fnmatch.filter(self.german_dic, r.candidate + '*')
form_lst.extend(fnmatch.filter(self.german_dic, r.candidate.lower() + '*'))
#print(form_lst)
if len(form_lst) < 7:
for form in form_lst:
zw_list_1.append(form)
if form_lst == r._candidates:
test3 = False
else:
test3 = r.next_candidate()
for u in zw_list_1:
r.add_candidate(term=u)
if not ' ' in a.strip() and not r:
if a.strip()[:-1] in self.german_dic or a.strip()[:-1].lower() in self.german_dic:
r = DictionaryEntry(value=a.strip(), key=a.strip())
else:
form_lst_2 = fnmatch.filter(self.german_dic, a.strip()[:-1] + '*')
form_lst_2.extend(fnmatch.filter(self.german_dic, a.strip().lower()[:-1] + '*'))
if not r and form_lst_2:
r = DictionaryEntry(key=a.strip(), value=form_lst_2)
else:
if self.person_abbrev:
for p in self.person_abbrev:
if p == a.strip():
r = DictionaryEntry(key=a, value=' '.join(self._person[:-1]))
break
if r:
zw_res.append(r)
if r.ambigue:
zw_ambigue = True
if indx2 == 0:
break
elif (len(zw_list) == 1 and indx2 == 0) or (len(zw_list) > 1 and indx2 != 0) and not r:
zw_res.append(DictionaryEntry(key=a, value=a))
if (zw_ambigue or always_amc) and (self.sketch_user and self.sketch_password):
print(zw_res)
if len(zw_res) == 1:
ll = self.query_amc(dict_objects=zw_res, extend=(1, 'left'))
else:
ll = self.query_amc(dict_objects=zw_res, extend=False)
if ll:
zw_res = ll
else:
for ii in zw_res:
print(str(ll))
print('candidate_preserve: {}'.format(ii.candidate))
ii.set_object(start_str=self.start_str, end_str=self.end_str)
else:
for ii in zw_res:
ii.set_object(start_str=self.start_str, end_str=self.end_str)
self._text_resolved_raw.append(zw_res)
idx = end_str
for txt in self._text_resolved_raw:
if type(txt) == str:
#print(txt)
self.text_resolved += txt
else:
for txt2 in txt:
#print('candidate: {}'.format(txt2.candidate))
if not self.text_resolved.endswith(' '):
self.text_resolved += ' '
self.text_resolved += txt2.candidate
return self.text_resolved
def __init__(self, text=None, person=None, dictionaries=None, user=None, password=None,*args, **kwargs):
if not text:
raise AttributeError('You must specify a text')
else:
self.text = text
if user and password:
self.sketch_user = user
self.sketch_password = password
else:
logging.warning('If no sketch engine user is specified, sketch engine will not be queried.')
self.sketch_user = False
self.sketch_password = False
self.person_abbrev = False
if person:
self._person = person
person_abbrev = person
person_abbrev.append(' '.join(self._person))
self.person_abbrev = []
for i in person_abbrev:
ii = i.split()
iii = ' '.join([x[0].strip() + '.' for x in ii])
self.person_abbrev.append(iii)
else:
self._person = None
self.abbrev = re.finditer(
r'(([^\W\d]+\.\s?)*[ÖÄÜöüäa-zA-Z\.]+\.)',
self.text,
flags=re.M)
self.german_dic = open(
'data/german.dic',
'r',
encoding='Latin 1').read().splitlines()
#self.german_dic_lower = [x.lower() for x in self.german_dic]
if dictionaries:
if type(dictionaries) == str:
self.dictionaries_lst.append((999, dictionaries))
elif type(dictionaries) == tuple:
self.dictionaries_lst.append(dictionaries)
elif type(dictionaries) == list:
self.dictionaries_lst.extend(dictionaries)
self._dictionaries = []
for d in sorted(self.dictionaries_lst, key=lambda tup: tup[0]):
self._dictionaries.append(DictionaryObject(d[1]))