# ******************************************************************************
# Copyright 2017-2018 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
import re
import string
import sys
from os import path
from typing import List, Tuple
import spacy
from nltk import WordNetLemmatizer
from nltk.stem.snowball import EnglishStemmer
from spacy.cli.download import download as spacy_download
from spacy.lang.en import LEMMA_EXC, LEMMA_INDEX, LEMMA_RULES
from spacy.lemmatizer import Lemmatizer
from nlp_architect.utils.generic import license_prompt
from joblib import Parallel, delayed
from functools import partial
from spacy.util import minibatch
from nlp_architect.common.core_nlp_doc import CoreNLPDoc
from pathlib import Path
[docs]class Vocabulary:
"""
A vocabulary that maps words to ints (storing a vocabulary)
"""
def __init__(self, start=0, include_oov=True):
self._vocab = {}
self._rev_vocab = {}
self.include_oov = include_oov
if include_oov:
self._vocab["<UNK>"] = start
self.oov_id = start
self._rev_vocab[start] = "<UNK>"
self.next = start + 1
else:
self.next = start
[docs] def add(self, word):
"""
Add word to vocabulary
Args:
word (str): word to add
Returns:
int: id of added word
"""
if word not in self._vocab.keys():
self._vocab[word] = self.next
self._rev_vocab[self.next] = word
self.next += 1
return self._vocab.get(word)
[docs] def word_id(self, word):
"""
Get the word_id of given word
Args:
word (str): word from vocabulary
Returns:
int: int id of word
"""
if hasattr(self, "oov_id"):
return self._vocab.get(word, self.oov_id)
return self._vocab.get(word, None)
def __getitem__(self, item):
"""
Get the word_id of given word (same as `word_id`)
"""
return self.word_id(item)
def __len__(self):
vocab_size = len(self._vocab)
if hasattr(self, "include_oov") and self.include_oov:
vocab_size += 1
return vocab_size
def __iter__(self):
for word in self.vocab.keys():
yield word
@property
def max(self):
return self.next
[docs] def id_to_word(self, wid):
"""
Word-id to word (string)
Args:
wid (int): word id
Returns:
str: string of given word id
"""
return self._rev_vocab.get(wid)
@property
def vocab(self):
"""
dict: get the dict object of the vocabulary
"""
return self._vocab
[docs] def add_vocab_offset(self, offset):
"""
Adds an offset to the ints of the vocabulary
Args:
offset (int): an int offset
"""
new_vocab = {}
for k, v in self.vocab.items():
new_vocab[k] = v + offset
self.next += offset
self._vocab = new_vocab
self._rev_vocab = {v: k for k, v in new_vocab.items()}
[docs] def reverse_vocab(self):
"""
Return the vocabulary as a reversed dict object
Returns:
dict: reversed vocabulary object
"""
return self._rev_vocab
all_letters = string.ascii_letters + " .,;'"
n_letters = len(all_letters)
[docs]def char_to_id(c):
"""return int id of given character
OOV char = len(all_letter) + 1
Args:
c (str): string character
Returns:
int: int value of given char
"""
char_idx = all_letters.find(c)
if char_idx == -1:
char_idx = n_letters
return char_idx
[docs]def id_to_char(c_id):
"""return character of given char id"""
if c_id < n_letters:
return all_letters[c_id]
return None
[docs]def try_to_load_spacy(model_name):
try:
spacy.load(model_name)
return True
except OSError:
return False
[docs]class SpacyInstance:
"""
Spacy pipeline wrapper which prompts user for model download authorization.
Args:
model (str, optional): spacy model name (default: english small model)
disable (list of string, optional): pipeline annotators to disable
(default: [])
display_prompt (bool, optional): flag to display/skip license prompt
n_jobs (int, optional): maximum number of concurrent Python worker processes.
If -1 all CPUs are used.
batch_size (int, optional): number of docs per batch.
spacy_doc (bool, optional): if True, parser outputs `spacy.tokens.doc`
instead of `CoreNLPDoc`
show_tok (bool, optional): include token text in `CoreNLPDoc` output
show_dok (bool, optional): include document text in `CoreNLPDoc` output
ptb_pos (bool, optional): convert spacy POS tags to Penn Treebank tags
"""
def __init__(
self,
model="en",
disable=None,
display_prompt=True,
n_jobs=8,
batch_size=1500,
spacy_doc=False,
show_tok=True,
show_doc=True,
ptb_pos=False,
):
if disable is None:
disable = []
try:
self._parser = spacy.load(model, disable=disable)
except OSError:
url = "https://spacy.io/models"
if display_prompt and license_prompt("Spacy {} model".format(model), url) is False:
sys.exit(0)
spacy_download(model)
print("Spacy model installed, please rerun your command.")
sys.exit(0)
self.n_jobs = n_jobs
self.batch_size = batch_size
self.spacy_doc = spacy_doc
self.show_tok = show_tok
self.show_doc = show_doc
self.ptb_pos = ptb_pos
@property
def parser(self):
"""return Spacy's instance parser"""
return self._parser
[docs] def parse(self, texts, output_dir=None):
"""
Parse a list of documents. If more than 1 document is passed, use multi-processing.
Args:
texts (list of str): documents to parse
output_dir (Path or str, optional): if given, parsed documents will be written here
"""
if self.n_jobs == 1:
return self.process_batch(texts, output_dir)
partitions = minibatch(texts, size=self.batch_size)
executor = Parallel(n_jobs=self.n_jobs, backend="multiprocessing", prefer="processes")
do = delayed(partial(self.process_batch))
tasks = (do(batch, output_dir, batch_i) for batch_i, batch in enumerate(partitions))
return [doc for batch in executor(tasks) for doc in batch]
[docs] def process_batch(self, texts, output_dir=None, batch_id=0):
parsed_docs = []
for i, doc in enumerate(self.parser.pipe(texts)):
parsed_doc = (
doc
if self.spacy_doc
else CoreNLPDoc.from_spacy(doc, self.show_tok, self.show_doc, self.ptb_pos)
)
parsed_docs.append(parsed_doc)
if output_dir:
out_path = Path(output_dir) / ("{}.{}.json".format(batch_id, i))
with open(out_path, "w", encoding="utf8") as f:
f.write(parsed_doc.pretty_json())
return parsed_docs
[docs] def tokenize(self, text: str) -> List[str]:
"""
Tokenize a sentence into tokens
Args:
text (str): text to tokenize
Returns:
list: a list of str tokens of input
"""
# pylint: disable=not-callable
return [t.text for t in self.parser(text)]
stemmer = EnglishStemmer()
lemmatizer = WordNetLemmatizer()
spacy_lemmatizer = Lemmatizer(LEMMA_INDEX, LEMMA_EXC, LEMMA_RULES)
p = re.compile(r"[ \-,;.@&_]")
[docs]class Stopwords(object):
"""
Stop words list class.
"""
stop_words = []
[docs] @staticmethod
def get_words():
if not Stopwords.stop_words:
sw_path = path.join(path.dirname(path.realpath(__file__)), "resources", "stopwords.txt")
with open(sw_path) as fp:
stop_words = []
for w in fp:
stop_words.append(w.strip().lower())
Stopwords.stop_words = stop_words
return Stopwords.stop_words
[docs]def simple_normalizer(text):
"""
Simple text normalizer. Runs each token of a phrase thru wordnet lemmatizer
and a stemmer.
"""
if not str(text).isupper() or not str(text).endswith("S") or not len(text.split()) == 1:
tokens = list(filter(lambda x: len(x) != 0, p.split(text.strip())))
text = " ".join([stemmer.stem(lemmatizer.lemmatize(t)) for t in tokens])
return text
[docs]def spacy_normalizer(text, lemma=None):
"""
Simple text normalizer using spacy lemmatizer. Runs each token of a phrase
thru a lemmatizer and a stemmer.
Arguments:
text(string): the text to normalize.
lemma(string): lemma of the given text. in this case only stemmer will
run.
"""
if not str(text).isupper() or not str(text).endswith("S") or not len(text.split()) == 1:
tokens = list(filter(lambda x: len(x) != 0, p.split(text.strip())))
if lemma:
lemma = lemma.split(" ")
text = " ".join([stemmer.stem(lem) for lem in lemma])
else:
text = " ".join([stemmer.stem(spacy_lemmatizer(t, "NOUN")[0]) for t in tokens])
return text
[docs]def read_sequential_tagging_file(file_path, ignore_line_patterns=None):
"""
Read a tab separated sequential tagging file.
Returns a list of list of tuple of tags (sentences, words)
Args:
file_path (str): input file path
ignore_line_patterns (list, optional): list of string patterns to ignore
Returns:
list of list of tuples
"""
if ignore_line_patterns:
assert isinstance(ignore_line_patterns, list), "ignore_line_patterns must be a list"
def _split_into_sentences(file_lines):
sentences = []
s = []
for line in file_lines:
if len(line) == 0:
sentences.append(s)
s = []
continue
s.append(line)
if len(s) > 0:
sentences.append(s)
return sentences
with open(file_path, encoding="utf-8") as fp:
data = fp.readlines()
data = [d.strip() for d in data]
if ignore_line_patterns:
for s in ignore_line_patterns:
data = [d for d in data if s not in d]
data = [tuple(d.split()) for d in data]
return _split_into_sentences(data)
[docs]def word_vector_generator(data, lower=False, start=0):
"""
Word vector generator util.
Transforms a list of sentences into numpy int vectors and returns the
constructed vocabulary
Arguments:
data (list): list of list of strings
lower (bool, optional): transform strings into lower case
start (int, optional): vocabulary index start integer
Returns:
2D numpy array and Vocabulary of the detected words
"""
vocab = Vocabulary(start)
data_vec = []
for sentence in data:
sentence_vec = []
for w in sentence:
word = w
if lower:
word = word.lower()
wid = vocab.add(word)
sentence_vec.append(wid)
data_vec.append(sentence_vec)
return data_vec, vocab
[docs]def character_vector_generator(data, start=0):
"""
Character word vector generator util.
Transforms a list of sentences into numpy int vectors of the characters
of the words of the sentence, and returns the constructed vocabulary
Arguments:
data (list): list of list of strings
start (int, optional): vocabulary index start integer
Returns:
np.array: a 2D numpy array
Vocabulary: constructed vocabulary
"""
vocab = Vocabulary(start)
data_vec = []
for sentence in data:
sentence_vec = []
for w in sentence:
word_vec = []
for char in w:
cid = vocab.add(char)
word_vec.append(cid)
sentence_vec.append(word_vec)
data_vec.append(sentence_vec)
return data_vec, vocab
[docs]def bio_to_spans(text: List[str], tags: List[str]) -> List[Tuple[int, int, str]]:
"""
Convert BIO tagged list of strings into span starts and ends
Args:
text: list of words
tags: list of tags
Returns:
tuple: list of start, end and tag of detected spans
"""
pointer = 0
starts = []
for (
i,
t,
) in enumerate(tags):
if t.startswith("B-"):
starts.append((i, pointer))
pointer += len(text[i]) + 1
spans = []
for s_i, s_char in starts:
label_str = tags[s_i][2:]
e = 0
e_char = len(text[s_i + e])
while len(tags) > s_i + e + 1 and tags[s_i + e + 1].startswith("I-"):
e += 1
e_char += 1 + len(text[s_i + e])
spans.append((s_char, s_char + e_char, label_str))
return spans