Source code for nlp_architect.utils.text

# ******************************************************************************
# Copyright 2017-2018 Intel Corporation
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
import re
import string
import sys
from os import path
from typing import List, Tuple

import spacy
from nltk import WordNetLemmatizer
from nltk.stem.snowball import EnglishStemmer
from import download as spacy_download
from spacy.lang.en import LEMMA_EXC, LEMMA_INDEX, LEMMA_RULES
from spacy.lemmatizer import Lemmatizer
from nlp_architect.utils.generic import license_prompt
from joblib import Parallel, delayed
from functools import partial
from spacy.util import minibatch
from nlp_architect.common.core_nlp_doc import CoreNLPDoc
from pathlib import Path

[docs]class Vocabulary: """ A vocabulary that maps words to ints (storing a vocabulary) """ def __init__(self, start=0, include_oov=True): self._vocab = {} self._rev_vocab = {} self.include_oov = include_oov if include_oov: self._vocab["<UNK>"] = start self.oov_id = start self._rev_vocab[start] = "<UNK>" = start + 1 else: = start
[docs] def add(self, word): """ Add word to vocabulary Args: word (str): word to add Returns: int: id of added word """ if word not in self._vocab.keys(): self._vocab[word] = self._rev_vocab[] = word += 1 return self._vocab.get(word)
[docs] def word_id(self, word): """ Get the word_id of given word Args: word (str): word from vocabulary Returns: int: int id of word """ if hasattr(self, "oov_id"): return self._vocab.get(word, self.oov_id) return self._vocab.get(word, None)
def __getitem__(self, item): """ Get the word_id of given word (same as `word_id`) """ return self.word_id(item) def __len__(self): vocab_size = len(self._vocab) if hasattr(self, "include_oov") and self.include_oov: vocab_size += 1 return vocab_size def __iter__(self): for word in self.vocab.keys(): yield word @property def max(self): return
[docs] def id_to_word(self, wid): """ Word-id to word (string) Args: wid (int): word id Returns: str: string of given word id """ return self._rev_vocab.get(wid)
@property def vocab(self): """ dict: get the dict object of the vocabulary """ return self._vocab
[docs] def add_vocab_offset(self, offset): """ Adds an offset to the ints of the vocabulary Args: offset (int): an int offset """ new_vocab = {} for k, v in self.vocab.items(): new_vocab[k] = v + offset += offset self._vocab = new_vocab self._rev_vocab = {v: k for k, v in new_vocab.items()}
[docs] def reverse_vocab(self): """ Return the vocabulary as a reversed dict object Returns: dict: reversed vocabulary object """ return self._rev_vocab
all_letters = string.ascii_letters + " .,;'" n_letters = len(all_letters)
[docs]def char_to_id(c): """return int id of given character OOV char = len(all_letter) + 1 Args: c (str): string character Returns: int: int value of given char """ char_idx = all_letters.find(c) if char_idx == -1: char_idx = n_letters return char_idx
[docs]def id_to_char(c_id): """return character of given char id""" if c_id < n_letters: return all_letters[c_id] return None
[docs]def try_to_load_spacy(model_name): try: spacy.load(model_name) return True except OSError: return False
[docs]class SpacyInstance: """ Spacy pipeline wrapper which prompts user for model download authorization. Args: model (str, optional): spacy model name (default: english small model) disable (list of string, optional): pipeline annotators to disable (default: []) display_prompt (bool, optional): flag to display/skip license prompt n_jobs (int, optional): maximum number of concurrent Python worker processes. If -1 all CPUs are used. batch_size (int, optional): number of docs per batch. spacy_doc (bool, optional): if True, parser outputs `spacy.tokens.doc` instead of `CoreNLPDoc` show_tok (bool, optional): include token text in `CoreNLPDoc` output show_dok (bool, optional): include document text in `CoreNLPDoc` output ptb_pos (bool, optional): convert spacy POS tags to Penn Treebank tags """ def __init__( self, model="en", disable=None, display_prompt=True, n_jobs=8, batch_size=1500, spacy_doc=False, show_tok=True, show_doc=True, ptb_pos=False, ): if disable is None: disable = [] try: self._parser = spacy.load(model, disable=disable) except OSError: url = "" if display_prompt and license_prompt("Spacy {} model".format(model), url) is False: sys.exit(0) spacy_download(model) print("Spacy model installed, please rerun your command.") sys.exit(0) self.n_jobs = n_jobs self.batch_size = batch_size self.spacy_doc = spacy_doc self.show_tok = show_tok self.show_doc = show_doc self.ptb_pos = ptb_pos @property def parser(self): """return Spacy's instance parser""" return self._parser
[docs] def parse(self, texts, output_dir=None): """ Parse a list of documents. If more than 1 document is passed, use multi-processing. Args: texts (list of str): documents to parse output_dir (Path or str, optional): if given, parsed documents will be written here """ if self.n_jobs == 1: return self.process_batch(texts, output_dir) partitions = minibatch(texts, size=self.batch_size) executor = Parallel(n_jobs=self.n_jobs, backend="multiprocessing", prefer="processes") do = delayed(partial(self.process_batch)) tasks = (do(batch, output_dir, batch_i) for batch_i, batch in enumerate(partitions)) return [doc for batch in executor(tasks) for doc in batch]
[docs] def process_batch(self, texts, output_dir=None, batch_id=0): parsed_docs = [] for i, doc in enumerate(self.parser.pipe(texts)): parsed_doc = ( doc if self.spacy_doc else CoreNLPDoc.from_spacy(doc, self.show_tok, self.show_doc, self.ptb_pos) ) parsed_docs.append(parsed_doc) if output_dir: out_path = Path(output_dir) / ("{}.{}.json".format(batch_id, i)) with open(out_path, "w", encoding="utf8") as f: f.write(parsed_doc.pretty_json()) return parsed_docs
[docs] def tokenize(self, text: str) -> List[str]: """ Tokenize a sentence into tokens Args: text (str): text to tokenize Returns: list: a list of str tokens of input """ # pylint: disable=not-callable return [t.text for t in self.parser(text)]
stemmer = EnglishStemmer() lemmatizer = WordNetLemmatizer() spacy_lemmatizer = Lemmatizer(LEMMA_INDEX, LEMMA_EXC, LEMMA_RULES) p = re.compile(r"[ \-,;.@&_]")
[docs]class Stopwords(object): """ Stop words list class. """ stop_words = []
[docs] @staticmethod def get_words(): if not Stopwords.stop_words: sw_path = path.join(path.dirname(path.realpath(__file__)), "resources", "stopwords.txt") with open(sw_path) as fp: stop_words = [] for w in fp: stop_words.append(w.strip().lower()) Stopwords.stop_words = stop_words return Stopwords.stop_words
[docs]def simple_normalizer(text): """ Simple text normalizer. Runs each token of a phrase thru wordnet lemmatizer and a stemmer. """ if not str(text).isupper() or not str(text).endswith("S") or not len(text.split()) == 1: tokens = list(filter(lambda x: len(x) != 0, p.split(text.strip()))) text = " ".join([stemmer.stem(lemmatizer.lemmatize(t)) for t in tokens]) return text
[docs]def spacy_normalizer(text, lemma=None): """ Simple text normalizer using spacy lemmatizer. Runs each token of a phrase thru a lemmatizer and a stemmer. Arguments: text(string): the text to normalize. lemma(string): lemma of the given text. in this case only stemmer will run. """ if not str(text).isupper() or not str(text).endswith("S") or not len(text.split()) == 1: tokens = list(filter(lambda x: len(x) != 0, p.split(text.strip()))) if lemma: lemma = lemma.split(" ") text = " ".join([stemmer.stem(lem) for lem in lemma]) else: text = " ".join([stemmer.stem(spacy_lemmatizer(t, "NOUN")[0]) for t in tokens]) return text
[docs]def read_sequential_tagging_file(file_path, ignore_line_patterns=None): """ Read a tab separated sequential tagging file. Returns a list of list of tuple of tags (sentences, words) Args: file_path (str): input file path ignore_line_patterns (list, optional): list of string patterns to ignore Returns: list of list of tuples """ if ignore_line_patterns: assert isinstance(ignore_line_patterns, list), "ignore_line_patterns must be a list" def _split_into_sentences(file_lines): sentences = [] s = [] for line in file_lines: if len(line) == 0: sentences.append(s) s = [] continue s.append(line) if len(s) > 0: sentences.append(s) return sentences with open(file_path, encoding="utf-8") as fp: data = fp.readlines() data = [d.strip() for d in data] if ignore_line_patterns: for s in ignore_line_patterns: data = [d for d in data if s not in d] data = [tuple(d.split()) for d in data] return _split_into_sentences(data)
[docs]def word_vector_generator(data, lower=False, start=0): """ Word vector generator util. Transforms a list of sentences into numpy int vectors and returns the constructed vocabulary Arguments: data (list): list of list of strings lower (bool, optional): transform strings into lower case start (int, optional): vocabulary index start integer Returns: 2D numpy array and Vocabulary of the detected words """ vocab = Vocabulary(start) data_vec = [] for sentence in data: sentence_vec = [] for w in sentence: word = w if lower: word = word.lower() wid = vocab.add(word) sentence_vec.append(wid) data_vec.append(sentence_vec) return data_vec, vocab
[docs]def character_vector_generator(data, start=0): """ Character word vector generator util. Transforms a list of sentences into numpy int vectors of the characters of the words of the sentence, and returns the constructed vocabulary Arguments: data (list): list of list of strings start (int, optional): vocabulary index start integer Returns: np.array: a 2D numpy array Vocabulary: constructed vocabulary """ vocab = Vocabulary(start) data_vec = [] for sentence in data: sentence_vec = [] for w in sentence: word_vec = [] for char in w: cid = vocab.add(char) word_vec.append(cid) sentence_vec.append(word_vec) data_vec.append(sentence_vec) return data_vec, vocab
[docs]def extract_nps(annotation_list, text=None): """ Extract Noun Phrases from given text tokens and phrase annotations. Returns a list of tuples with start/end indexes. Args: annotation_list (list): a list of annotation tags in str text (list, optional): a list of token texts in str Returns: list of start/end markers of noun phrases, if text is provided a list of noun phrase texts """ np_starts = [i for i in range(len(annotation_list)) if annotation_list[i] == "B-NP"] np_markers = [] for s in np_starts: i = 1 while s + i < len(annotation_list) and annotation_list[s + i] == "I-NP": i += 1 np_markers.append((s, s + i)) return_markers = np_markers if text: assert len(text) == len(annotation_list), "annotations/text length mismatch" return_markers = [" ".join(text[s:e]) for s, e in np_markers] return return_markers
[docs]def bio_to_spans(text: List[str], tags: List[str]) -> List[Tuple[int, int, str]]: """ Convert BIO tagged list of strings into span starts and ends Args: text: list of words tags: list of tags Returns: tuple: list of start, end and tag of detected spans """ pointer = 0 starts = [] for ( i, t, ) in enumerate(tags): if t.startswith("B-"): starts.append((i, pointer)) pointer += len(text[i]) + 1 spans = [] for s_i, s_char in starts: label_str = tags[s_i][2:] e = 0 e_char = len(text[s_i + e]) while len(tags) > s_i + e + 1 and tags[s_i + e + 1].startswith("I-"): e += 1 e_char += 1 + len(text[s_i + e]) spans.append((s_char, s_char + e_char, label_str)) return spans