Source code for helper_functions.abbreviation_solver

import re, fnmatch, base64, requests
import logging


class DictionaryEntry(object):
    def set_object(self, start_str=None, end_str=None, choose=None):
        if start_str is None or end_str is None:
            raise AttributeError('You must specify a start and end strings to set an object.')
        else:
            self.start_string = start_str
            self.end_string = end_str
            if choose:
                if type(choose) == int or re.match('^\d+$', choose):
                    self.candidate = self._candidates[int(choose)]
                elif type(choose) == str:
                    for ff in self._candidates:
                        if ff.strip().lower() == choose.strip().lower():
                            self.candidate = ff.strip()
                else:
                    raise AttributeError('Choose variable is of wrong type.')
                self.ambigue = False
                self.next_candidate = None
                self.prev_candidate = None
            else:
                self.ambigue = True
                self.candidate = self._candidates[0]
            return self

    def add_candidate(self, term=None):
        if not term:
            raise AttributeError('You need to give a term to add.')
        if term not in self._candidates:
            self._candidates.append(term)
            self.ambigue = True

    def add_to_object(self, objct=None):
        if not objct:
            raise AttributeError('You need to give an object to add.')
        if type(objct) != list:
            objct = [objct]
        for o in objct:
            test = True
            while test:
                self.add_candidate(o.candidate)
                test = o.next_candidate()

    def prev_candidate(self):
        if self._index - 1 > 0:
            self._index -= 1
            self.candidate = self._candidates[self._index]
            return self.candidate
        else:
            return False

    def next_candidate(self):
        if len(self._candidates) > self._index + 1:
            self._index += 1
            self.candidate = self._candidates[self._index]
            return self.candidate
        else:
            return False

    def __init__(self, key=None, value=None, original_term=None, *args, **kwargs):
        super(DictionaryEntry).__init__(*args, **kwargs)
        if not key or not value:
            raise AttributeError('You must specify a key and a value to create an object')
        if original_term:
            self.original_term = original_term
        self.key = key
        if type(value) == list:
            self._candidates = value
            if len(value) > 1:
                self.ambigue = True
            else:
                self.ambigue = False
        elif type(value) == str:
            if ',' in value:
                self._candidates = []
                for x in value.strip().split(','):
                    self._candidates.append(x.strip())
                self.ambigue = True
            else:
                self._candidates = [value.strip()]
                self.ambigue = False
        self.candidate = self._candidates[0]
        self._index = 0


class DictionaryObject(object):
    @staticmethod
    def _readdictionary(dictionary):
        verz_dict = dict()
        verz_dict_left = dict()
        verz_dict_lower = dict()
        counter = 0
        with open(dictionary, 'r') as verz:
            for row in verz:
                counter += 1
                f = re.match(r'^([^\t]+?)\t+([^\t]+?)$', row)
                if f:
                    ent_clean = f.group(2).strip().replace(' etc.', '')
                    if f.group(1).strip().startswith('-'):
                        verz_dict_left[f.group(1).strip()[1:]] = []
                        for ff in ent_clean.split(','):
                            verz_dict_left[f.group(1).strip()[1:]].append(ff.strip()[1:])
                    else:
                        verz_dict[f.group(1).strip()] = []
                        if f.group(1).strip().lower() not in verz_dict_lower.keys():
                            verz_dict_lower[f.group(1).strip().lower()] = [f.group(1).strip()]
                        else:
                            verz_dict_lower[f.group(1).strip().lower()].append(f.group(1).strip())
                        for ff in ent_clean.split(','):
                            verz_dict[f.group(1).strip()].append(ff.strip())
                else:
                    logging.warning('no match: {}'.format(counter))
        return verz_dict, verz_dict_left, verz_dict_lower

    def search_abbrev(self, term=None):
        r = False
        if not term:
            raise AttributeError('You must specify a search term.')
        term = term.strip()
        if not term.endswith('.'):
            term += '.'
        if term in self._verz_dict.keys():
            r = DictionaryEntry(key=term, value=self._verz_dict[term])
        if term.lower() in self._verz_dict_lower.keys():
            for x in self._verz_dict_lower[term.lower()]:
                if x != term:
                    if not r:
                        r = DictionaryEntry(key=term, value=self._verz_dict[x])
                    else:
                        rr = DictionaryEntry(key=term, value=self._verz_dict[x])
                        r.add_to_object(rr)
        if r:
            return r
        if len(term.split()) == 1:
            for z in self._verz_dict_left.keys():
                if term.endswith(z):
                    if not r:
                        r = DictionaryEntry(key=term, value=[term[:-1+len(u)]+u for u in self._verz_dict_left[z]])
                        r.ambigue = True
                    if r:
                        rr = DictionaryEntry(key=term, value=[term[:-1+len(u)]+u for u in self._verz_dict_left[z]])
                        r.add_to_object(rr)
        if r:
            return r
        else:
            return False

    def __init__(self, dictionary=None, *args, **kwargs):
        if not dictionary:
            raise AttributeError('You must specify a dictionary')
        self._index = False
        self._index_left = False
        self._verz_dict, self._verz_dict_left, self._verz_dict_lower = self._readdictionary(dictionary=dictionary)
        #print(self._verz_dict_lower)


class QuerySketchEngine(object):
    sketch_url = 'http://ske.herkules.arz.oeaw.ac.at/bonito/run.cgi/view'
    data = {'corpname': 'amc_2.3', 'format': 'json'}
    max_page = None
    test_next = True

    def __init__(self, query, page_size=25, max_depth=5, user=None, password=None, *args, **kwargs):
        super(QuerySketchEngine, self).__init__(*args, **kwargs)
        if not user or not password:
            raise AttributeError('You need to specify user and password for the sketch engine')
        z = '{}:{}'.format(user, password).encode()
        base64string = base64.b64encode(z)[:-1]
        self._headers = {"Authorization": b"Basic " + base64string, 'accept': 'application/json'}
        self.query_url = query
        self.page_size = page_size
        self.max_depth = max_depth

    def query(self, page=1):
        self.data['q'] = self.query_url
        self.data['fromp'] = str(page)
        self.data['pagesize'] = str(self.page_size)
        self.page = page
        self.req_ = requests.get(self.sketch_url, params=self.data, headers=self._headers)
        try:
            self.json = self.req_.json()
        except:
            self.json = False
        if self.json:
            if 'error' in self.json.keys():
                self.json = False
            elif not self.max_page:
                if 'numofpages' in self.json.keys():
                    if int(self.json['numofpages']) <= self.max_depth:
                        self.max_page = int(self.json['numofpages'])
                    else:
                        self.max_page = self.max_depth
                else:
                    self.max_page = 1

    def next_page(self):
        if self.page < self.max_page:
            self.query(page=self.page + 1)
        else:
            self.test_next = False

    def previous_page(self):
        if self.page > 1:
            self.query(page=self.page - 1)
        else:
            self.test_next = False

    def find_plausible(self):
        if self.json:
            self.candidates = {}
            while self.test_next:
                for x in self.json['Lines']:
                    cand = ''
                    for xx in x['Kwic']:
                        if xx['class'] != 'attr':
                            cand += xx['str']
                    if cand.strip() in self.candidates.keys():
                        self.candidates[cand.strip()] += 1
                    else:
                        self.candidates[cand.strip()] = 1
                self.next_page()
            cand_new = []
            for t in self.candidates.keys():
                cand_new.append((t, self.candidates[t]))
            self.candidates = sorted(cand_new, key=lambda tup: tup[1], reverse=True)
            if len(self.candidates) == 0:
                self.candidates = False
        else:
            self.candidates = False


[docs]class ResolveAbbreviations(object): """Used to resolve abbreviations in German Texts. For resolving it uses several resources: * A list of German words * A list of common German abbreviations * A list of abbreviations used in the Austrian Bibliographic Dictionary (ÖBL) * and an API that allows to query the Austrian Media Corpus (AMC_) *Example:* :: abbrev = ResolveAbbreviations(text='test text', person=['Müller', 'Peter'], user='user', password='pw') resolved_text = abbrev.resolve(always_amc=True) :param str text: The text that should be processed (unicode string) :param list person: List of names of the Person (used to create possible abbreviations of the Person names) :param dictionaries: Dictionary that should be used additionally :type dictionaries: str or tuple or list :param str user: User for the sketch engine :param str password: Password for the sketch engine .. _AMC: http://www.oeaw.ac.at/acdh/de/amc """ pipeline = [] dictionaries_lst = [ (1, 'data/Abkürzungsverzeichnis_APIS_ed_9_1_17.tsv'), (2, 'data/abkuerzungen_edit.tsv')] stop_chars = ['(', ')', ';', '!', '?', '"', ','] def save_text(self, resolve_all=True, path=None): if not path: raise AttributeError('You need to give a path.') if not self._text_resolved_raw: raise AttributeError('You need to run resolve first.') with open(path, 'w') as wf: txt = '' for x in self._text_resolved_raw: if type(x) == str: txt += x else: txt += ' '.join([y.candidate for y in x]) wf.write(txt) def _extend(self, extend=None, final=False): if not extend: raise AttributeError('You need to pass an extend object.') if (extend[1] == 'left' and self.start_str > 0) or (self.end_str == len(self.text) and extend[1] == 'right'): txt = self.text[:self.start_str].split() txt_lst = txt[-extend[0]] extend = (extend[0], 'left') elif (extend[1] == 'right' and self.end_str < len(self.text)) or (self.start_str == 0 and extend[1] == 'left'): txt = self.text[self.end_str:].split() txt_lst = txt[:extend[0]] extend = (extend[0], 'right') else: print('not captured: {}'.format(extend)) if type(txt_lst) == str: txt_lst = [txt_lst] txt_lst_fin = [] for z in txt_lst: if z not in self.stop_chars: for zz in self.stop_chars: z.replace(zz, '') txt_lst_fin.append(z) if len(txt_lst_fin) < extend[0] and not final: extend_new = (extend[0] + 1, extend[1]) txt_lst_fin, extend_new = self._extend(extend=extend_new, final=True) return txt_lst_fin, extend def query_amc(self, dict_objects=None, extend=False): if not dict_objects: raise AttributeError('You need to pass a list of dictionary object.') if extend: txt_lst, extend = self._extend(extend=extend) for idx, tt in enumerate(txt_lst): d = DictionaryEntry(key=tt, value=tt.strip()) if extend[1] == 'right': dict_objects.append(d) elif extend[1] == 'left': dict_objects.insert(idx, d) query = 'q' for o in dict_objects: print('dict object: {} - extend: {}, key: {}'.format(o.candidate, extend, o.key)) if o.key.endswith('.'): key_1 = o.key[:-1]+'*' else: key_1 = o.key if len(key_1) > 2: query += '[word="{}"'.format(key_1) if o.ambigue: test = False query += '|word="{}"'.format(o.candidate) test = o.next_candidate() while test: query += '|word="{}"'.format(o.candidate) test = o.next_candidate() query += ']' else: query += '|word="{}"]'.format(o.candidate) if query[1] == '|': query = 'q['+query[2:] print(query) res = QuerySketchEngine(query, user=self.sketch_user, password=self.sketch_password) res.query() res.find_plausible() if res.candidates: cands_lst = res.candidates[0][0].split() if extend: if extend[1] == 'right': cands_lst = cands_lst[:-extend[0]] dict_objects = dict_objects[:-extend[0]] elif extend[1] == 'left': cands_lst = cands_lst[extend[0]:] dict_objects = dict_objects[extend[0]:] for idx, cand in enumerate(cands_lst): if cand.strip() in dict_objects[idx]._candidates: dict_objects[idx].set_object(choose=cand.strip(), start_str=self.start_str, end_str=self.end_str) else: dict_objects[idx].add_candidate(term=cand.strip()) dict_objects[idx].set_object(choose=cand.strip(), start_str=self.start_str, end_str=self.end_str) print('candidates') return dict_objects else: if extend: if extend[1] == 'right': dict_objects = dict_objects[:-extend[0]] elif extend[1] == 'left': dict_objects = dict_objects[extend[0]:] return dict_objects
[docs] def resolve(self, always_amc=False): """ Resolve function. Resolves the abbreviation in the text given in the __init__ function. :param always_amc: *(Boolean)* Specifies whether to also use the AMC for unambiguous abbreviations. Defaults to False. :return: Resolved text. :rtype: string """ self._text_resolved_raw = [] self.text_resolved = '' idx = 0 for abb in self.abbrev: start_str = abb.span()[0] end_str = abb.span()[1] self.start_str = start_str self.end_str = end_str self._text_resolved_raw.append(self.text[idx:start_str]) zw_list = abb.group(1).strip().split() if len(zw_list) > 1: zw_list.insert(0, abb.group(1).strip()) zw_res = [] zw_ambigue = False print(zw_list) for indx2, a in enumerate(zw_list): r = False if len(a.strip()) > 2: for d in self._dictionaries: if not r: r = d.search_abbrev(term=a.strip()) #print('R: {}'.format(r)) #if r: else: rr = d.search_abbrev(term=a.strip()) if rr: r.add_to_object(rr) if r: test3 = True zw_list_1 = [] while test3: form_lst = fnmatch.filter(self.german_dic, r.candidate + '*') form_lst.extend(fnmatch.filter(self.german_dic, r.candidate.lower() + '*')) #print(form_lst) if len(form_lst) < 7: for form in form_lst: zw_list_1.append(form) if form_lst == r._candidates: test3 = False else: test3 = r.next_candidate() for u in zw_list_1: r.add_candidate(term=u) if not ' ' in a.strip() and not r: if a.strip()[:-1] in self.german_dic or a.strip()[:-1].lower() in self.german_dic: r = DictionaryEntry(value=a.strip(), key=a.strip()) else: form_lst_2 = fnmatch.filter(self.german_dic, a.strip()[:-1] + '*') form_lst_2.extend(fnmatch.filter(self.german_dic, a.strip().lower()[:-1] + '*')) if not r and form_lst_2: r = DictionaryEntry(key=a.strip(), value=form_lst_2) else: if self.person_abbrev: for p in self.person_abbrev: if p == a.strip(): r = DictionaryEntry(key=a, value=' '.join(self._person[:-1])) break if r: zw_res.append(r) if r.ambigue: zw_ambigue = True if indx2 == 0: break elif (len(zw_list) == 1 and indx2 == 0) or (len(zw_list) > 1 and indx2 != 0) and not r: zw_res.append(DictionaryEntry(key=a, value=a)) if (zw_ambigue or always_amc) and (self.sketch_user and self.sketch_password): print(zw_res) if len(zw_res) == 1: ll = self.query_amc(dict_objects=zw_res, extend=(1, 'left')) else: ll = self.query_amc(dict_objects=zw_res, extend=False) if ll: zw_res = ll else: for ii in zw_res: print(str(ll)) print('candidate_preserve: {}'.format(ii.candidate)) ii.set_object(start_str=self.start_str, end_str=self.end_str) else: for ii in zw_res: ii.set_object(start_str=self.start_str, end_str=self.end_str) self._text_resolved_raw.append(zw_res) idx = end_str for txt in self._text_resolved_raw: if type(txt) == str: #print(txt) self.text_resolved += txt else: for txt2 in txt: #print('candidate: {}'.format(txt2.candidate)) if not self.text_resolved.endswith(' '): self.text_resolved += ' ' self.text_resolved += txt2.candidate return self.text_resolved
def __init__(self, text=None, person=None, dictionaries=None, user=None, password=None,*args, **kwargs): if not text: raise AttributeError('You must specify a text') else: self.text = text if user and password: self.sketch_user = user self.sketch_password = password else: logging.warning('If no sketch engine user is specified, sketch engine will not be queried.') self.sketch_user = False self.sketch_password = False self.person_abbrev = False if person: self._person = person person_abbrev = person person_abbrev.append(' '.join(self._person)) self.person_abbrev = [] for i in person_abbrev: ii = i.split() iii = ' '.join([x[0].strip() + '.' for x in ii]) self.person_abbrev.append(iii) else: self._person = None self.abbrev = re.finditer( r'(([^\W\d]+\.\s?)*[ÖÄÜöüäa-zA-Z\.]+\.)', self.text, flags=re.M) self.german_dic = open( 'data/german.dic', 'r', encoding='Latin 1').read().splitlines() #self.german_dic_lower = [x.lower() for x in self.german_dic] if dictionaries: if type(dictionaries) == str: self.dictionaries_lst.append((999, dictionaries)) elif type(dictionaries) == tuple: self.dictionaries_lst.append(dictionaries) elif type(dictionaries) == list: self.dictionaries_lst.extend(dictionaries) self._dictionaries = [] for d in sorted(self.dictionaries_lst, key=lambda tup: tup[0]): self._dictionaries.append(DictionaryObject(d[1]))